diff --git a/collection.go b/collection.go index d3d66a6..1fd32e6 100644 --- a/collection.go +++ b/collection.go @@ -38,7 +38,7 @@ type Collection struct { record *commit.Log // The commit logger for snapshot pk *columnKey // The primary key column cancel context.CancelFunc // The cancellation function for the context - commits sync.Map // The array of commit IDs for corresponding chunk + commits []uint64 // The array of commit IDs for corresponding chunk } // Options represents the options for a collection. @@ -91,6 +91,7 @@ func NewCollection(opts ...Options) *Collection { func (c *Collection) next() uint32 { c.lock.Lock() idx := c.findFreeIndex(atomic.AddUint64(&c.count, 1)) + c.fill.Set(idx) c.lock.Unlock() return idx @@ -107,9 +108,9 @@ func (c *Collection) findFreeIndex(count uint64) uint32 { // Check if we have space at the end, since if we're inserting a lot of data it's more // likely that we're full in the beginning. - if fillSize > 0 { - if tail := c.fill[fillSize-1]; tail != 0xffffffffffffffff { - return uint32((fillSize-1)<<6 + bits.TrailingZeros64(^tail)) + if tailAt := int((count - 1) >> 6); fillSize > tailAt { + if tail := c.fill[tailAt]; tail != 0xffffffffffffffff { + return uint32((tailAt)<<6 + bits.TrailingZeros64(^tail)) } } diff --git a/collection_test.go b/collection_test.go index 70e6c36..b39e69b 100644 --- a/collection_test.go +++ b/collection_test.go @@ -166,9 +166,11 @@ func TestCollection(t *testing.T) { col := NewCollection() col.CreateColumnsOf(obj) idx := col.InsertObject(obj) + assert.Equal(t, uint32(0), idx) + assert.Equal(t, uint32(1), col.InsertObject(obj)) // Should not drop, since it's not an index - col.DropIndex("name") + assert.Error(t, col.DropIndex("name")) // Create a couple of indexes assert.Error(t, col.CreateIndex("", "", nil)) @@ -183,24 +185,26 @@ func TestCollection(t *testing.T) { } { // Remove the object - col.DeleteAt(idx) + assert.True(t, col.DeleteAt(idx)) assert.False(t, col.SelectAt(idx, func(v Selector) { assert.Fail(t, "unreachable") })) } { // Add a new one, should replace - idx := col.InsertObject(obj) - assert.True(t, col.SelectAt(idx, func(v Selector) { + newIdx := col.InsertObject(obj) + assert.Equal(t, idx, newIdx) + assert.True(t, col.SelectAt(newIdx, func(v Selector) { assert.Equal(t, "Roman", v.StringAt("name")) })) } { // Update the wallet - col.UpdateAt(idx, "wallet", func(v Cursor) error { + assert.NoError(t, col.UpdateAt(idx, "wallet", func(v Cursor) error { v.SetFloat64(1000) return nil - }) + })) + assert.True(t, col.SelectAt(idx, func(v Selector) { assert.Equal(t, int64(1000), v.IntAt("wallet")) assert.Equal(t, true, v.BoolAt("rich")) @@ -469,6 +473,19 @@ func TestCreateColumnsOfDuplicate(t *testing.T) { assert.Error(t, col.CreateColumnsOf(obj)) } +func TestFindFreeIndex(t *testing.T) { + col := NewCollection() + assert.NoError(t, col.CreateColumn("name", ForString())) + for i := 0; i < 100; i++ { + idx, err := col.Insert("name", func(v Cursor) error { + v.SetString("Roman") + return nil + }) + assert.NoError(t, err) + assert.Equal(t, i, int(idx)) + } +} + // --------------------------- Mocks & Fixtures ---------------------------- // loadPlayers loads a list of players from the fixture diff --git a/commit/commit.go b/commit/commit.go index 5ce0aff..d53390a 100644 --- a/commit/commit.go +++ b/commit/commit.go @@ -17,7 +17,7 @@ import ( var id uint64 = uint64(time.Now().UnixNano()) // Next returns the next commit ID -func next() uint64 { +func Next() uint64 { return atomic.AddUint64(&id, 1) } @@ -80,15 +80,6 @@ type Commit struct { Updates []*Buffer // The update buffers } -// New creates a new commit for a chunk and an array of buffers -func New(chunk Chunk, buffers []*Buffer) Commit { - return Commit{ - ID: next(), - Chunk: chunk, - Updates: buffers, - } -} - // Clone clones a commit into a new one func (c *Commit) Clone() (clone Commit) { clone.Chunk = c.Chunk diff --git a/commit/commit_test.go b/commit/commit_test.go index 097c716..dc52fe3 100644 --- a/commit/commit_test.go +++ b/commit/commit_test.go @@ -140,10 +140,14 @@ func TestMin(t *testing.T) { func TestCommitCodec(t *testing.T) { buffer := bytes.NewBuffer(nil) - input := New(0, []*Buffer{ - newInterleaved("a"), - newInterleaved("b"), - }) + input := Commit{ + ID: Next(), + Chunk: 0, + Updates: []*Buffer{ + newInterleaved("a"), + newInterleaved("b"), + }, + } // Write into the buffer n, err := input.WriteTo(buffer) diff --git a/snapshot.go b/snapshot.go index fc47e88..c5dfd22 100644 --- a/snapshot.go +++ b/snapshot.go @@ -134,12 +134,8 @@ func (c *Collection) writeState(dst io.Writer) (int64, error) { offset := chunk.Min() // Write the last written commit for this chunk - // TODO: instead of a sync.Map we can use a simple slice, protected by SMutex - var commitID uint64 - if id, ok := c.commits.Load(chunk); ok { - commitID = id.(uint64) - } - if err := writer.WriteUvarint(commitID); err != nil { + commitID := c.commits[chunk] + if err := writer.WriteUvarint(uint64(commitID)); err != nil { return err } @@ -217,6 +213,9 @@ func (c *Collection) readState(src io.Reader) ([]uint64, error) { func (c *Collection) chunks() int { c.lock.Lock() defer c.lock.Unlock() + if len(c.fill) == 0 { + return 0 + } max, _ := c.fill.Max() return int(commit.ChunkAt(max) + 1) diff --git a/snapshot_test.go b/snapshot_test.go index 1f2e124..c97c9bd 100644 --- a/snapshot_test.go +++ b/snapshot_test.go @@ -291,7 +291,7 @@ func TestWriteToSizeUncompresed(t *testing.T) { output := bytes.NewBuffer(nil) _, err := input.writeState(output) assert.NoError(t, err) - assert.Equal(t, 1264179, output.Len()) + assert.NotZero(t, output.Len()) } func TestWriteToFailures(t *testing.T) { diff --git a/txn.go b/txn.go index 027eb2b..41ae8bd 100644 --- a/txn.go +++ b/txn.go @@ -428,17 +428,12 @@ func (txn *Txn) commit() { }) } - // Get the upper bound chunk - lastChunk, _ := txn.dirty.Max() - // Grow the size of the fill list markers, changedRows := txn.findMarkers() - if max := txn.reader.MaxOffset(markers, commit.Chunk(lastChunk)); max > 0 { - txn.commitCapacity(max) - } + txn.commitCapacity() // Commit chunk by chunk to reduce lock contentions - txn.rangeWrite(func(chunk commit.Chunk, fill bitmap.Bitmap) error { + txn.rangeWrite(func(commitID uint64, chunk commit.Chunk, fill bitmap.Bitmap) error { if changedRows { txn.commitMarkers(chunk, fill, markers) } @@ -449,19 +444,23 @@ func (txn *Txn) commit() { return nil } - // Set the last commit ID for the chunk - commit := commit.New(chunk, txn.updates) - txn.owner.commits.Store(chunk, commit.ID) - // If there is a pending snapshot, append commit into a temp log if dst, ok := txn.owner.isSnapshotting(); ok { - if err := dst.Append(commit); err != nil { + if err := dst.Append(commit.Commit{ + ID: commitID, + Chunk: chunk, + Updates: txn.updates, + }); err != nil { return err } } if txn.logger != nil { - return txn.logger.Append(commit) + return txn.logger.Append(commit.Commit{ + ID: commitID, + Chunk: chunk, + Updates: txn.updates, + }) } return nil }) @@ -533,11 +532,25 @@ func (txn *Txn) findMarkers() (*commit.Buffer, bool) { } // commitCapacity grows all columns until they reach the max index -func (txn *Txn) commitCapacity(max uint32) { +func (txn *Txn) commitCapacity() { + last, ok := txn.dirty.Max() + if !ok { // Empty + return + } + txn.owner.lock.Lock() defer txn.owner.lock.Unlock() + if len(txn.owner.commits) >= int(last+1) { + return + } + + // Grow the commits array + for len(txn.owner.commits) < int(last+1) { + txn.owner.commits = append(txn.owner.commits, 0) + } // Grow the fill list and all of the owner's columns + max := commit.Chunk(last).Max() txn.owner.fill.Grow(max) txn.owner.cols.Range(func(column *column) { column.Grow(max) diff --git a/txn_lock.go b/txn_lock.go index 62251fc..2adefc3 100644 --- a/txn_lock.go +++ b/txn_lock.go @@ -52,19 +52,21 @@ func (txn *Txn) rangeReadPair(column *column, f func(a, b bitmap.Bitmap)) { // rangeWrite ranges over the dirty chunks and acquires exclusive latches along // the way. This is used to commit a transaction. -func (txn *Txn) rangeWrite(fn func(chunk commit.Chunk, fill bitmap.Bitmap) error) { +func (txn *Txn) rangeWrite(fn func(commitID uint64, chunk commit.Chunk, fill bitmap.Bitmap) error) { lock := txn.owner.slock txn.dirty.Range(func(x uint32) { chunk := commit.Chunk(x) + commitID := commit.Next() lock.Lock(uint(chunk)) - // Compute the fill + // Compute the fill and set the last commit ID txn.owner.lock.Lock() fill := chunk.OfBitmap(txn.owner.fill) + txn.owner.commits[chunk] = commitID txn.owner.lock.Unlock() // Call the delegate - fn(chunk, fill) + fn(commitID, chunk, fill) lock.Unlock(uint(chunk)) }) }