Skip to content

Commit

Permalink
Implement PageBuffer (#996)
Browse files Browse the repository at this point in the history
PageBuffer can be an alternative for bytes.Buffer. PageBuffer consists of many
pages(fixed size []byte). It starts by writing to the first (empty) page. When it is full,
it allocates next page with double the size of the current page. It does not copy 
content from the current page(as done in bytes.Buffer) and appends it to its 
collection of pages.
It also satisfies writer interface. A reader can be used to read data from it.
  • Loading branch information
ashish-goswami authored Sep 11, 2019
1 parent f88aa0c commit 86a77bb
Show file tree
Hide file tree
Showing 6 changed files with 459 additions and 6 deletions.
3 changes: 2 additions & 1 deletion manifest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ func buildTable(t *testing.T, keyValues [][]string) *os.File {
UserMeta: 0,
})
}
f.Write(b.Finish())
_, err = f.Write(b.Finish())
require.NoError(t, err, "unable to write to file.")
f.Close()
f, _ = y.OpenSyncedFile(filename, true)
return f
Expand Down
1 change: 1 addition & 0 deletions stream_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ func (w *sortedWriter) createTable(data []byte) error {
if _, err := fd.Write(data); err != nil {
return err
}

opts := table.Options{
LoadingMode: w.db.opt.TableLoadingMode,
ChkMode: w.db.opt.ChecksumVerificationMode,
Expand Down
26 changes: 25 additions & 1 deletion table/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ func TestTableIndex(t *testing.T) {
}
builder.Add(k, vs)
}
f.Write(builder.Finish())
_, err = f.Write(builder.Finish())
require.NoError(t, err, "unable to write to file")

opts = Options{LoadingMode: options.LoadToRAM, ChkMode: options.OnTableAndBlockRead}
tbl, err := OpenTable(f, opts)
Expand All @@ -76,3 +77,26 @@ func TestTableIndex(t *testing.T) {
}
})
}

func BenchmarkBuilder(b *testing.B) {
rand.Seed(time.Now().Unix())
key := func(i int) []byte {
return []byte(fmt.Sprintf("%032d", i))
}

val := make([]byte, 32)
rand.Read(val)
vs := y.ValueStruct{Value: []byte(val)}

keysCount := 1300000 // This number of entries consumes ~64MB of memory.
for i := 0; i < b.N; i++ {
opts := Options{BlockSize: 4 * 1024, BloomFalsePositive: 0.01}
builder := NewTableBuilder(opts)

for i := 0; i < keysCount; i++ {
builder.Add(key(i), vs)
}

_ = builder.Finish()
}
}
12 changes: 8 additions & 4 deletions table/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ func buildTable(t *testing.T, keyValues [][]string) *os.File {
y.AssertTrue(len(kv) == 2)
b.Add(y.KeyWithTs([]byte(kv[0]), 0), y.ValueStruct{Value: []byte(kv[1]), Meta: 'A', UserMeta: 0})
}
f.Write(b.Finish())
_, err = f.Write(b.Finish())
require.NoError(t, err, "writing to file failed")
f.Close()
f, _ = y.OpenSyncedFile(filename, true)
return f
Expand Down Expand Up @@ -666,7 +667,8 @@ func TestTableBigValues(t *testing.T) {
builder.Add(key, vs)
}

f.Write(builder.Finish())
_, err = f.Write(builder.Finish())
require.NoError(t, err, "unable to write to file")
opts = Options{LoadingMode: options.LoadToRAM, ChkMode: options.OnTableAndBlockRead}
tbl, err := OpenTable(f, opts)
require.NoError(t, err, "unable to open table")
Expand Down Expand Up @@ -761,7 +763,8 @@ func BenchmarkReadMerged(b *testing.B) {
v := fmt.Sprintf("%d", id)
builder.Add([]byte(k), y.ValueStruct{Value: []byte(v), Meta: 123, UserMeta: 0})
}
f.Write(builder.Finish())
_, err = f.Write(builder.Finish())
require.NoError(b, err, "unable to write to file")
opts = Options{LoadingMode: options.LoadToRAM, ChkMode: options.OnTableAndBlockRead}
tbl, err := OpenTable(f, opts)
y.Check(err)
Expand Down Expand Up @@ -847,7 +850,8 @@ func getTableForBenchmarks(b *testing.B, count int) *Table {
builder.Add([]byte(k), y.ValueStruct{Value: []byte(v)})
}

f.Write(builder.Finish())
_, err = f.Write(builder.Finish())
require.NoError(b, err, "unable to write to file")
opts = Options{LoadingMode: options.LoadToRAM, ChkMode: options.NoVerification}
tbl, err := OpenTable(f, opts)
require.NoError(b, err, "unable to open table")
Expand Down
170 changes: 170 additions & 0 deletions y/y.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/binary"
"fmt"
"hash/crc32"
"io"
"math"
"os"
"reflect"
Expand Down Expand Up @@ -338,3 +339,172 @@ func BytesToU32Slice(b []byte) []uint32 {
hdr.Data = uintptr(unsafe.Pointer(&b[0]))
return u32s
}

// page struct contains one underlying buffer.
type page struct {
buf []byte
}

// PageBuffer consists of many pages. A page is a wrapper over []byte. PageBuffer can act as a
// replacement of bytes.Buffer. Instead of having single underlying buffer, it has multiple
// underlying buffers. Hence it avoids any copy during relocation(as happens in bytes.Buffer).
// PageBuffer allocates memory in pages. Once a page is full, it will allocate page with double the
// size of previous page. Its function are not thread safe.
type PageBuffer struct {
pages []*page

length int // Length of PageBuffer.
nextPageSize int // Size of next page to be allocated.
}

// NewPageBuffer returns a new PageBuffer with first page having size pageSize.
func NewPageBuffer(pageSize int) *PageBuffer {
b := &PageBuffer{}
b.pages = append(b.pages, &page{buf: make([]byte, 0, pageSize)})
b.nextPageSize = pageSize * 2
return b
}

// Write writes data to PageBuffer b. It returns number of bytes written and any error encountered.
func (b *PageBuffer) Write(data []byte) (int, error) {
dataLen := len(data)
for {
cp := b.pages[len(b.pages)-1] // Current page.

n := copy(cp.buf[len(cp.buf):cap(cp.buf)], data)
cp.buf = cp.buf[:len(cp.buf)+n]
b.length += n

if len(data) == n {
break
}
data = data[n:]

b.pages = append(b.pages, &page{buf: make([]byte, 0, b.nextPageSize)})
b.nextPageSize *= 2
}

return dataLen, nil
}

// WriteByte writes data byte to PageBuffer and returns any encountered error.
func (b *PageBuffer) WriteByte(data byte) error {
_, err := b.Write([]byte{data})
return err
}

// Len returns length of PageBuffer.
func (b *PageBuffer) Len() int {
return b.length
}

// pageForOffset returns pageIdx and startIdx for the offset.
func (b *PageBuffer) pageForOffset(offset int) (int, int) {
AssertTrue(offset < b.length)

var pageIdx, startIdx, sizeNow int
for i := 0; i < len(b.pages); i++ {
cp := b.pages[i]

if sizeNow+len(cp.buf)-1 < offset {
sizeNow += len(cp.buf)
} else {
pageIdx = i
startIdx = offset - sizeNow
break
}
}

return pageIdx, startIdx
}

// Truncate truncates PageBuffer to length n.
func (b *PageBuffer) Truncate(n int) {
pageIdx, startIdx := b.pageForOffset(n)
// For simplicity of the code reject extra pages. These pages can be kept.
b.pages = b.pages[:pageIdx+1]
cp := b.pages[len(b.pages)-1]
cp.buf = cp.buf[:startIdx]
b.length = n
}

// Bytes returns whole Buffer data as single []byte.
func (b *PageBuffer) Bytes() []byte {
buf := make([]byte, b.length)
written := 0
for i := 0; i < len(b.pages); i++ {
written += copy(buf[written:], b.pages[i].buf)
}

return buf
}

// WriteTo writes whole buffer to w. It returns number of bytes written and any error encountered.
func (b *PageBuffer) WriteTo(w io.Writer) (int64, error) {
written := int64(0)
for i := 0; i < len(b.pages); i++ {
n, err := w.Write(b.pages[i].buf)
written += int64(n)
if err != nil {
return written, err
}
}

return written, nil
}

// NewReaderAt returns a reader which starts reading from offset in page buffer.
func (b *PageBuffer) NewReaderAt(offset int) *PageBufferReader {
pageIdx, startIdx := b.pageForOffset(offset)

return &PageBufferReader{
buf: b,
pageIdx: pageIdx,
startIdx: startIdx,
}
}

// PageBufferReader is a reader for PageBuffer.
type PageBufferReader struct {
buf *PageBuffer // Underlying page buffer.
pageIdx int // Idx of page from where it will start reading.
startIdx int // Idx inside page - buf.pages[pageIdx] from where it will start reading.
}

// Read reads upto len(p) bytes. It returns number of bytes read and any error encountered.
func (r *PageBufferReader) Read(p []byte) (int, error) {
// Check if there is enough to Read.
pc := len(r.buf.pages)

read := 0
for r.pageIdx < pc && read < len(p) {
cp := r.buf.pages[r.pageIdx] // Current Page.
endIdx := len(cp.buf) // Last Idx up to which we can read from this page.

n := copy(p[read:], cp.buf[r.startIdx:endIdx])
read += n
r.startIdx += n

// Instead of len(cp.buf), we comparing with cap(cp.buf). This ensures that we move to next
// page only when we have read all data. Reading from last page is an edge case. We don't
// want to move to next page until last page is full to its capacity.
if r.startIdx >= cap(cp.buf) {
// We should move to next page.
r.pageIdx++
r.startIdx = 0
continue
}

// When last page in not full to its capacity and we have read all data up to its
// length, just break out of the loop.
if r.pageIdx == pc-1 {
break
}
}

if read == 0 {
return read, io.EOF
}

return read, nil
}
Loading

0 comments on commit 86a77bb

Please sign in to comment.