Skip to content

Commit

Permalink
Basic compaction logic. Update table. Add SeekToLast
Browse files Browse the repository at this point in the history
  • Loading branch information
jchiu255 committed Mar 7, 2017
1 parent 453ba6b commit 04490f9
Show file tree
Hide file tree
Showing 7 changed files with 373 additions and 135 deletions.
198 changes: 154 additions & 44 deletions db/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,58 @@ package db

import (
"bytes"
// "fmt"
"io/ioutil"
"os"
"sort"
"sync"
// "sync"

"github.com/dgraph-io/badger/table"
"github.com/dgraph-io/badger/y"
)

// TableController corresponds to a file or table on disk.
type TableController struct {
const (
baseSize = 1 << 20
maxLevels = 10
)

var (
levels []*levelWrapper
)

type tableWrapper struct {
smallest, biggest []byte // Smallest and largest keys.
fd *os.File
size uint64 // Size of file.
tbl *table.Table
table *table.Table
markedForDelete bool
}

type LevelController struct {
sync.RWMutex // For now, when merging a table from level L to L+1, we will lock level L+1.
type levelWrapper struct {
level int
maxSize int
tables []*tableWrapper
}

level int
size uint64
tbl []*TableController // For level>=1, keep this sorted and non-overlapping.
next *LevelController
func init() {
levels = make([]*levelWrapper, maxLevels)
for i := 0; i < maxLevels; i++ {
levels[i] = &levelWrapper{
level: i,
}
if i == 0 {
levels[i].maxSize = baseSize
} else {
levels[i].maxSize = levels[i-1].maxSize * 10
}
}
}

// overlappingInputs returns a slice of s.tbm for tables that overlap with key range.
// Note that this is a closed interval [begin, end].
func (s *LevelController) overlappingInputs(begin, end []byte) []*TableController {
// overlappingTables returns a slice of s.tables that overlap with closed interval [begin, end].
func (s *levelWrapper) overlappingTables(begin, end []byte) []*tableWrapper {
y.AssertTrue(s.level > 0)
var out []*TableController
var out []*tableWrapper
// Not that many files. Just do simple linear search. TODO: Consider binary search.
for _, t := range s.tbl {
for _, t := range s.tables {
if bytes.Compare(begin, t.biggest) <= 0 && bytes.Compare(end, t.smallest) >= 0 {
out = append(out, t)
}
Expand All @@ -42,45 +62,135 @@ func (s *LevelController) overlappingInputs(begin, end []byte) []*TableControlle
}

// sortTable sorts tables for the level. ASSUME write lock.
func (s *LevelController) sortTables() {
func (s *levelWrapper) sortTables() {
y.AssertTrue(s.level >= 1)
sort.Slice(s.tbl, func(i, j int) bool {
return bytes.Compare(s.tbl[i].smallest, s.tbl[j].smallest) < 0
sort.Slice(s.tables, func(i, j int) bool {
return bytes.Compare(s.tables[i].smallest, s.tables[j].smallest) < 0
})
}

// Will not be needed if we move ConcatIterator, MergingIterator to this package.
func getTables(tables []*tableWrapper) []*table.Table {
var out []*table.Table
for _, t := range tables {
out = append(out, t.table)
}
return out
}

func newTableWrapper(f *os.File, t *table.Table) *tableWrapper {
out := &tableWrapper{
fd: f,
table: t,
}
it := t.NewIterator()
it.SeekToFirst()
y.AssertTrue(it.Valid())
// KV interface is kind of awkward...
it.KV(func(k, v []byte) {
out.smallest = k
})
it.SeekToLast()
y.AssertTrue(it.Valid())
it.KV(func(k, v []byte) {
out.biggest = k
})
// Make sure we did populate smallest and biggest.
y.AssertTrue(len(out.smallest) > 0) // We do not allow empty keys...
y.AssertTrue(len(out.biggest) > 0)
return out
}

func doCopy(src []byte) []byte {
out := make([]byte, len(src))
y.AssertTrue(len(src) == copy(out, src))
return out
}

// compact merges t:=s.tbl[idx] to the next level.
// While merging, t remains available for reading.
// No one should be allowed to merge into this level as it might write into t.
func (s *LevelController) compact(idx int) error {
y.AssertTrue(s.next != nil)
y.AssertTrue(idx >= 0 && idx < len(s.tbl))
func (s *levelWrapper) compact(idx int) error {
y.AssertTrue(s.level+1 < len(levels)) // Make sure s is not the last level.
y.AssertTrue(idx >= 0 && idx < len(s.tables))

// t is table to be merged to next level.
t := s.tbl[idx]

// s.next.Lock()
// defer s.next.Unlock()
s.next.sortTables()

inputs := s.next.overlappingInputs(t.smallest, t.biggest)
if len(inputs) == 0 {
// Just move file to next level. TODO: What to lock? Need to CAS?
s.next.tbl = append(s.next.tbl, t) // Add first then delete.
s.tbl = append(s.tbl[:idx], s.tbl[idx+1:]...)
return
t := s.tables[idx]
nl := levels[s.level+1] // Next level.
nl.sortTables() // TODO: Not many items, but can avoid sorting.

tables := nl.overlappingTables(t.smallest, t.biggest)
if len(tables) == 0 {
// No overlapping tables with next level. Just move this file to the next level.
nl.tables = append(nl.tables, t)
s.tables = append(s.tables[:idx], s.tables[idx+1:]...)
return nil
}
// There is at least one overlapping table.
// If we exceed
it := t.tbl.NewIterator()
it.SeekToFirst()
if err := it.Error() != nil; err != nil {
return err

for _, t := range tables {
t.markedForDelete = true
}
for ; it.Valid(); it.Next() {
if
it1 := table.NewConcatIterator([]*table.Table{t.table})
it2 := table.NewConcatIterator(getTables(tables))
it := table.NewMergingIterator(it1, it2)
// Currently, when the iterator is constructed, we automatically SeekToFirst.
// We may not want to do that.
var newTables []*tableWrapper
var builder table.TableBuilder
builder.Reset()

var lastKey []byte

finishTable := func() error {
fd, err := ioutil.TempFile("", "badger")
if err != nil {
return err
}
fd.Write(builder.Finish())
builder.Reset()
table, err := table.OpenTable(fd)
if err != nil {
return nil
}
newTables = append(newTables, newTableWrapper(fd, table))
return nil
}
// for _, input := range inputs {

// }
for ; it.Valid(); it.Next() {
if builder.FinalSize() > nl.maxSize {
if err := finishTable(); err != nil {
return err
}
}
kSlice, vSlice := it.KeyValue()
// We need to make copies of these as table might use them as "last".
key := doCopy(kSlice)
val := doCopy(vSlice)
// fmt.Printf("key=%s val=%s lastKey=%s\n", string(key), string(val), string(lastKey))
if bytes.Equal(key, lastKey) {
// Ignore duplicate keys. The first iterator takes precedence.
continue
}
if err := builder.Add(key, val); err != nil {
return err
}
lastKey = key
}
if builder.Empty() {
return nil
}
if err := finishTable(); err != nil {
return err
}
// newTables is now populated. Add them to level "nl" and delete old tables. Need to lock...
nlTables := nl.tables[:0]
for _, t := range nl.tables {
if !t.markedForDelete {
nlTables = append(nlTables, t)
}
}
nlTables = append(nlTables, newTables...)
nl.tables = nlTables
s.tables = append(s.tables[:idx], s.tables[idx+1:]...) // Delete table on this level.
return nil
}
86 changes: 86 additions & 0 deletions db/compact_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package db

import (
// "fmt"
"io/ioutil"
"sort"
"testing"

"github.com/stretchr/testify/require"

"github.com/dgraph-io/badger/table"
"github.com/dgraph-io/badger/y"
)

// keyValues is n by 2 where n is number of pairs.
func buildTable(t *testing.T, keyValues [][]string) *tableWrapper {
b := table.TableBuilder{}
b.Reset()
f, err := ioutil.TempFile("", "badger")
require.NoError(t, err)

sort.Slice(keyValues, func(i, j int) bool {
return keyValues[i][0] < keyValues[j][0]
})
for _, kv := range keyValues {
y.AssertTrue(len(kv) == 2)
require.NoError(t, b.Add([]byte(kv[0]), []byte(kv[1])))
}
f.Write(b.Finish())
table, err := table.OpenTable(f)
require.NoError(t, err)
return newTableWrapper(f, table)
}

func extractTable(table *tableWrapper) [][]string {
var out [][]string
it := table.table.NewIterator()
for it.SeekToFirst(); it.Valid(); it.Next() {
it.KV(func(k, v []byte) {
out = append(out, []string{string(k), string(v)})
})
}
return out
}

// TestCompact tests most basic compaction logic.
func TestCompact(t *testing.T) {
t0 := buildTable(t, [][]string{
{"k2", "z2"},
{"k22", "z22"},
{"k5", "z5"},
})
t1a := buildTable(t, [][]string{
{"k0", "v0"},
})
t1b := buildTable(t, [][]string{
{"k1", "v1"},
{"k2", "v2"},
})
t1c := buildTable(t, [][]string{
{"k3", "v3"},
{"k4", "v4"},
})

levels[0].tables = []*tableWrapper{t0}
levels[1].tables = []*tableWrapper{t1a, t1b, t1c}
levels[0].compact(0)

require.Len(t, levels[1].tables, 2)
require.Empty(t, levels[0].tables)

levels[1].sortTables()

require.EqualValues(t, [][]string{
{"k0", "v0"},
}, extractTable(levels[1].tables[0]))

require.EqualValues(t, [][]string{
{"k1", "v1"},
{"k2", "z2"},
{"k22", "z22"},
{"k3", "v3"},
{"k4", "v4"},
{"k5", "z5"},
}, extractTable(levels[1].tables[1]))
}
Empty file removed db/iterator.go
Empty file.
15 changes: 12 additions & 3 deletions table/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,13 @@ func (h header) Size() int {
type TableBuilder struct {
counter int

// TODO: Now that each file is one table, this table can get really big. The builder will
// have to be initialized with a file object that we can keep appending to.
// Builder cannot afford to store everything in memory.
// And that is not needed because the header is at the end.
buf []byte

// TODO: Consider removing this var. It just tracks size of buf.
pos int

baseKey []byte
Expand All @@ -51,6 +57,8 @@ type TableBuilder struct {
prevOffset int
}

func (b *TableBuilder) Empty() bool { return len(b.buf) == 0 }

func (b *TableBuilder) Reset() {
b.counter = 0
// if cap(b.buf) < int(tableSize) {
Expand Down Expand Up @@ -116,9 +124,10 @@ func (b *TableBuilder) Add(key, value []byte) error {
return nil
}

// length returns the exact final size of the array, given its current state.
// Takes into account the header which has not been written into b.buf.
func (b *TableBuilder) length() int {
// FinalSize returns the *rough* final size of the array, counting the header which is not yet written.
// TODO: Look into why there is a discrepancy. I suspect it is because of Write(empty, empty)
// at the end. The diff can vary.
func (b *TableBuilder) FinalSize() int {
return b.pos + 6 /* empty header */ + 4*len(b.restarts) + 8 // 8 = end of buf offset + len(restarts).
}

Expand Down
Loading

0 comments on commit 04490f9

Please sign in to comment.