Skip to content

Commit

Permalink
Fix collection capacity resizing and reduce allocs (#44)
Browse files Browse the repository at this point in the history
* Fix grow resizing

* fix race

* fix bulk insert
  • Loading branch information
kelindar authored Dec 25, 2021
1 parent 534c4d3 commit b772938
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 48 deletions.
9 changes: 5 additions & 4 deletions collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Collection struct {
record *commit.Log // The commit logger for snapshot
pk *columnKey // The primary key column
cancel context.CancelFunc // The cancellation function for the context
commits sync.Map // The array of commit IDs for corresponding chunk
commits []uint64 // The array of commit IDs for corresponding chunk
}

// Options represents the options for a collection.
Expand Down Expand Up @@ -91,6 +91,7 @@ func NewCollection(opts ...Options) *Collection {
func (c *Collection) next() uint32 {
c.lock.Lock()
idx := c.findFreeIndex(atomic.AddUint64(&c.count, 1))

c.fill.Set(idx)
c.lock.Unlock()
return idx
Expand All @@ -107,9 +108,9 @@ func (c *Collection) findFreeIndex(count uint64) uint32 {

// Check if we have space at the end, since if we're inserting a lot of data it's more
// likely that we're full in the beginning.
if fillSize > 0 {
if tail := c.fill[fillSize-1]; tail != 0xffffffffffffffff {
return uint32((fillSize-1)<<6 + bits.TrailingZeros64(^tail))
if tailAt := int((count - 1) >> 6); fillSize > tailAt {
if tail := c.fill[tailAt]; tail != 0xffffffffffffffff {
return uint32((tailAt)<<6 + bits.TrailingZeros64(^tail))
}
}

Expand Down
29 changes: 23 additions & 6 deletions collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,11 @@ func TestCollection(t *testing.T) {
col := NewCollection()
col.CreateColumnsOf(obj)
idx := col.InsertObject(obj)
assert.Equal(t, uint32(0), idx)
assert.Equal(t, uint32(1), col.InsertObject(obj))

// Should not drop, since it's not an index
col.DropIndex("name")
assert.Error(t, col.DropIndex("name"))

// Create a couple of indexes
assert.Error(t, col.CreateIndex("", "", nil))
Expand All @@ -183,24 +185,26 @@ func TestCollection(t *testing.T) {
}

{ // Remove the object
col.DeleteAt(idx)
assert.True(t, col.DeleteAt(idx))
assert.False(t, col.SelectAt(idx, func(v Selector) {
assert.Fail(t, "unreachable")
}))
}

{ // Add a new one, should replace
idx := col.InsertObject(obj)
assert.True(t, col.SelectAt(idx, func(v Selector) {
newIdx := col.InsertObject(obj)
assert.Equal(t, idx, newIdx)
assert.True(t, col.SelectAt(newIdx, func(v Selector) {
assert.Equal(t, "Roman", v.StringAt("name"))
}))
}

{ // Update the wallet
col.UpdateAt(idx, "wallet", func(v Cursor) error {
assert.NoError(t, col.UpdateAt(idx, "wallet", func(v Cursor) error {
v.SetFloat64(1000)
return nil
})
}))

assert.True(t, col.SelectAt(idx, func(v Selector) {
assert.Equal(t, int64(1000), v.IntAt("wallet"))
assert.Equal(t, true, v.BoolAt("rich"))
Expand Down Expand Up @@ -469,6 +473,19 @@ func TestCreateColumnsOfDuplicate(t *testing.T) {
assert.Error(t, col.CreateColumnsOf(obj))
}

func TestFindFreeIndex(t *testing.T) {
col := NewCollection()
assert.NoError(t, col.CreateColumn("name", ForString()))
for i := 0; i < 100; i++ {
idx, err := col.Insert("name", func(v Cursor) error {
v.SetString("Roman")
return nil
})
assert.NoError(t, err)
assert.Equal(t, i, int(idx))
}
}

// --------------------------- Mocks & Fixtures ----------------------------

// loadPlayers loads a list of players from the fixture
Expand Down
11 changes: 1 addition & 10 deletions commit/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
var id uint64 = uint64(time.Now().UnixNano())

// Next returns the next commit ID
func next() uint64 {
func Next() uint64 {
return atomic.AddUint64(&id, 1)
}

Expand Down Expand Up @@ -80,15 +80,6 @@ type Commit struct {
Updates []*Buffer // The update buffers
}

// New creates a new commit for a chunk and an array of buffers
func New(chunk Chunk, buffers []*Buffer) Commit {
return Commit{
ID: next(),
Chunk: chunk,
Updates: buffers,
}
}

// Clone clones a commit into a new one
func (c *Commit) Clone() (clone Commit) {
clone.Chunk = c.Chunk
Expand Down
12 changes: 8 additions & 4 deletions commit/commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,14 @@ func TestMin(t *testing.T) {

func TestCommitCodec(t *testing.T) {
buffer := bytes.NewBuffer(nil)
input := New(0, []*Buffer{
newInterleaved("a"),
newInterleaved("b"),
})
input := Commit{
ID: Next(),
Chunk: 0,
Updates: []*Buffer{
newInterleaved("a"),
newInterleaved("b"),
},
}

// Write into the buffer
n, err := input.WriteTo(buffer)
Expand Down
11 changes: 5 additions & 6 deletions snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,8 @@ func (c *Collection) writeState(dst io.Writer) (int64, error) {
offset := chunk.Min()

// Write the last written commit for this chunk
// TODO: instead of a sync.Map we can use a simple slice, protected by SMutex
var commitID uint64
if id, ok := c.commits.Load(chunk); ok {
commitID = id.(uint64)
}
if err := writer.WriteUvarint(commitID); err != nil {
commitID := c.commits[chunk]
if err := writer.WriteUvarint(uint64(commitID)); err != nil {
return err
}

Expand Down Expand Up @@ -217,6 +213,9 @@ func (c *Collection) readState(src io.Reader) ([]uint64, error) {
func (c *Collection) chunks() int {
c.lock.Lock()
defer c.lock.Unlock()
if len(c.fill) == 0 {
return 0
}

max, _ := c.fill.Max()
return int(commit.ChunkAt(max) + 1)
Expand Down
2 changes: 1 addition & 1 deletion snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func TestWriteToSizeUncompresed(t *testing.T) {
output := bytes.NewBuffer(nil)
_, err := input.writeState(output)
assert.NoError(t, err)
assert.Equal(t, 1264179, output.Len())
assert.NotZero(t, output.Len())
}

func TestWriteToFailures(t *testing.T) {
Expand Down
41 changes: 27 additions & 14 deletions txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,17 +428,12 @@ func (txn *Txn) commit() {
})
}

// Get the upper bound chunk
lastChunk, _ := txn.dirty.Max()

// Grow the size of the fill list
markers, changedRows := txn.findMarkers()
if max := txn.reader.MaxOffset(markers, commit.Chunk(lastChunk)); max > 0 {
txn.commitCapacity(max)
}
txn.commitCapacity()

// Commit chunk by chunk to reduce lock contentions
txn.rangeWrite(func(chunk commit.Chunk, fill bitmap.Bitmap) error {
txn.rangeWrite(func(commitID uint64, chunk commit.Chunk, fill bitmap.Bitmap) error {
if changedRows {
txn.commitMarkers(chunk, fill, markers)
}
Expand All @@ -449,19 +444,23 @@ func (txn *Txn) commit() {
return nil
}

// Set the last commit ID for the chunk
commit := commit.New(chunk, txn.updates)
txn.owner.commits.Store(chunk, commit.ID)

// If there is a pending snapshot, append commit into a temp log
if dst, ok := txn.owner.isSnapshotting(); ok {
if err := dst.Append(commit); err != nil {
if err := dst.Append(commit.Commit{
ID: commitID,
Chunk: chunk,
Updates: txn.updates,
}); err != nil {
return err
}
}

if txn.logger != nil {
return txn.logger.Append(commit)
return txn.logger.Append(commit.Commit{
ID: commitID,
Chunk: chunk,
Updates: txn.updates,
})
}
return nil
})
Expand Down Expand Up @@ -533,11 +532,25 @@ func (txn *Txn) findMarkers() (*commit.Buffer, bool) {
}

// commitCapacity grows all columns until they reach the max index
func (txn *Txn) commitCapacity(max uint32) {
func (txn *Txn) commitCapacity() {
last, ok := txn.dirty.Max()
if !ok { // Empty
return
}

txn.owner.lock.Lock()
defer txn.owner.lock.Unlock()
if len(txn.owner.commits) >= int(last+1) {
return
}

// Grow the commits array
for len(txn.owner.commits) < int(last+1) {
txn.owner.commits = append(txn.owner.commits, 0)
}

// Grow the fill list and all of the owner's columns
max := commit.Chunk(last).Max()
txn.owner.fill.Grow(max)
txn.owner.cols.Range(func(column *column) {
column.Grow(max)
Expand Down
8 changes: 5 additions & 3 deletions txn_lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,21 @@ func (txn *Txn) rangeReadPair(column *column, f func(a, b bitmap.Bitmap)) {

// rangeWrite ranges over the dirty chunks and acquires exclusive latches along
// the way. This is used to commit a transaction.
func (txn *Txn) rangeWrite(fn func(chunk commit.Chunk, fill bitmap.Bitmap) error) {
func (txn *Txn) rangeWrite(fn func(commitID uint64, chunk commit.Chunk, fill bitmap.Bitmap) error) {
lock := txn.owner.slock
txn.dirty.Range(func(x uint32) {
chunk := commit.Chunk(x)
commitID := commit.Next()
lock.Lock(uint(chunk))

// Compute the fill
// Compute the fill and set the last commit ID
txn.owner.lock.Lock()
fill := chunk.OfBitmap(txn.owner.fill)
txn.owner.commits[chunk] = commitID
txn.owner.lock.Unlock()

// Call the delegate
fn(chunk, fill)
fn(commitID, chunk, fill)
lock.Unlock(uint(chunk))
})
}
Expand Down

0 comments on commit b772938

Please sign in to comment.