Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add partial snapshotting support #36

Merged
merged 5 commits into from
Dec 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 17 additions & 19 deletions collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ type Collection struct {
slock *smutex.SMutex128 // The sharded mutex for the collection
cols columns // The map of columns
fill bitmap.Bitmap // The fill-list
size int // The initial size for new columns
opts Options // The options configured
codec codec // The compression codec
writer commit.Writer // The commit writer
pk *columnKey // The primary key column
cancel context.CancelFunc // The cancellation function for the context
Expand Down Expand Up @@ -72,10 +73,11 @@ func NewCollection(opts ...Options) *Collection {
store := &Collection{
cols: makeColumns(8),
txns: newTxnPool(),
size: options.Capacity,
opts: options,
slock: new(smutex.SMutex128),
fill: make(bitmap.Bitmap, 0, options.Capacity>>6),
writer: options.Writer,
codec: newCodec(&options),
cancel: cancel,
}

Expand Down Expand Up @@ -232,7 +234,7 @@ func (c *Collection) CreateColumnsOf(object Object) error {

// CreateColumn creates a column of a specified type and adds it to the collection.
func (c *Collection) CreateColumn(columnName string, column Column) error {
column.Grow(uint32(c.size))
column.Grow(uint32(c.opts.Capacity))
c.cols.Store(columnName, columnFor(columnName, column))

// If necessary, create a primary key column
Expand Down Expand Up @@ -265,7 +267,7 @@ func (c *Collection) CreateIndex(indexName, columnName string, fn func(r Reader)
// Create and add the index column,
index := newIndex(indexName, columnName, fn)
c.lock.Lock()
index.Grow(uint32(c.size))
index.Grow(uint32(c.opts.Capacity))
c.cols.Store(indexName, index)
c.cols.Store(columnName, column, index)
c.lock.Unlock()
Expand Down Expand Up @@ -349,19 +351,6 @@ func (c *Collection) vacuum(ctx context.Context, interval time.Duration) {
}
}

// Replay replays a commit on a collection, applying the changes.
func (c *Collection) Replay(change commit.Commit) error {
return c.Query(func(txn *Txn) error {
txn.dirty.Set(change.Chunk)
for i := range change.Updates {
if !change.Updates[i].IsEmpty() {
txn.updates = append(txn.updates, change.Updates[i])
}
}
return nil
})
}

// --------------------------- column registry ---------------------------

// columns represents a concurrent column registry.
Expand All @@ -384,12 +373,21 @@ type columnEntry struct {
cols []*column // The columns and its computed
}

// Count returns the number of columns
func (c *columns) Count() int {
cols := c.cols.Load().([]columnEntry)
return len(cols)
}

// Range iterates over columns in the registry.
func (c *columns) Range(fn func(column *column)) {
func (c *columns) Range(fn func(column *column) error) error {
cols := c.cols.Load().([]columnEntry)
for _, v := range cols {
fn(v.cols[0])
if err := fn(v.cols[0]); err != nil {
return err
}
}
return nil
}

// Load loads a column by its name.
Expand Down
148 changes: 11 additions & 137 deletions collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,27 @@ package column
import (
"context"
"encoding/json"
"fmt"
"math"
"math/rand"
"os"
"runtime"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/kelindar/async"
"github.com/kelindar/column/commit"
"github.com/stretchr/testify/assert"
)

/*
cpu: Intel(R) Core(TM) i7-9700K CPU @ 3.60GHz
BenchmarkCollection/insert-8 2167 578821 ns/op 1223 B/op 1 allocs/op
BenchmarkCollection/select-at-8 42703713 27.72 ns/op 0 B/op 0 allocs/op
BenchmarkCollection/scan-8 2032 598751 ns/op 49 B/op 0 allocs/op
BenchmarkCollection/count-8 800036 1498 ns/op 0 B/op 0 allocs/op
BenchmarkCollection/range-8 16833 70556 ns/op 0 B/op 0 allocs/op
BenchmarkCollection/update-at-8 3689354 323.6 ns/op 0 B/op 0 allocs/op
BenchmarkCollection/update-all-8 1198 1003934 ns/op 4004 B/op 0 allocs/op
BenchmarkCollection/delete-at-8 8071692 145.7 ns/op 0 B/op 0 allocs/op
BenchmarkCollection/delete-all-8 2328974 494.7 ns/op 0 B/op 0 allocs/op
BenchmarkCollection/insert-8 2608 519498 ns/op 24298 B/op 500 allocs/op
BenchmarkCollection/select-at-8 42467803 27.63 ns/op 0 B/op 0 allocs/op
BenchmarkCollection/scan-8 1976 574104 ns/op 88 B/op 0 allocs/op
BenchmarkCollection/count-8 783828 1516 ns/op 0 B/op 0 allocs/op
BenchmarkCollection/range-8 17836 67879 ns/op 6 B/op 0 allocs/op
BenchmarkCollection/update-at-8 3707148 322.9 ns/op 0 B/op 0 allocs/op
BenchmarkCollection/update-all-8 1183 976786 ns/op 4025 B/op 0 allocs/op
BenchmarkCollection/delete-at-8 9005782 130.1 ns/op 0 B/op 0 allocs/op
BenchmarkCollection/delete-all-8 2359329 493.3 ns/op 0 B/op 0 allocs/op
*/
func BenchmarkCollection(b *testing.B) {
b.Run("insert", func(b *testing.B) {
Expand Down Expand Up @@ -159,118 +154,6 @@ func BenchmarkCollection(b *testing.B) {
})
}

// Test replication many times
func TestReplicate(t *testing.T) {
for x := 0; x < 20; x++ {
rand.Seed(int64(x))
runReplication(t, 10000, 50, runtime.NumCPU())
}
}

// runReplication runs a concurrent replication test
func runReplication(t *testing.T, updates, inserts, concurrency int) {
t.Run(fmt.Sprintf("replicate-%v-%v", updates, inserts), func(t *testing.T) {
writer := make(commit.Channel, 10)
object := map[string]interface{}{
"float64": float64(0),
"int32": int32(0),
"string": "",
}

// Create a primary
primary := NewCollection(Options{
Capacity: inserts,
Writer: &writer,
})
// Replica with the same schema
replica := NewCollection(Options{
Capacity: inserts,
})

// Create schemas and start streaming replication into the replica
primary.CreateColumnsOf(object)
replica.CreateColumnsOf(object)
var done sync.WaitGroup
done.Add(1)
go func() {
defer done.Done() // Drained
for change := range writer {
assert.NoError(t, replica.Replay(change))
}
}()

// Write some objects
for i := 0; i < inserts; i++ {
primary.InsertObject(object)
}

work := make(chan async.Task)
pool := async.Consume(context.Background(), 50, work)
defer pool.Cancel()

// Random concurrent updates
var wg sync.WaitGroup
wg.Add(updates)
for i := 0; i < updates; i++ {
work <- async.NewTask(func(ctx context.Context) (interface{}, error) {
defer wg.Done()

// Randomly update a column
offset := uint32(rand.Int31n(int32(inserts - 1)))
primary.UpdateAt(offset, "float64", func(v Cursor) error {
switch rand.Int31n(3) {
case 0:
v.SetFloat64(math.Round(rand.Float64()*1000) / 100)
case 1:
v.SetInt32At("int32", rand.Int31n(100000))
case 2:
v.SetStringAt("string", fmt.Sprintf("hi %v", rand.Int31n(10)))
}
return nil
})

// Randomly delete an item
if rand.Int31n(5) == 0 {
primary.DeleteAt(uint32(rand.Int31n(int32(inserts - 1))))
}

// Randomly insert an item
if rand.Int31n(5) == 0 {
primary.InsertObject(object)
}
return nil, nil
})
}

// Replay all of the changes into the replica
wg.Wait()
close(writer)
done.Wait()

// Check if replica and primary are the same
if !assert.Equal(t, primary.Count(), replica.Count(), "replica and primary should be the same size") {
return
}

primary.Query(func(txn *Txn) error {
return txn.Range("float64", func(v Cursor) {
v1, v2 := v.FloatAt("float64"), v.IntAt("int32")
if v1 != 0 {
assert.True(t, txn.SelectAt(v.idx, func(s Selector) {
assert.Equal(t, v.FloatAt("float64"), s.FloatAt("float64"))
}))
}

if v2 != 0 {
assert.True(t, txn.SelectAt(v.idx, func(s Selector) {
assert.Equal(t, v.IntAt("int32"), s.IntAt("int32"))
}))
}
})
})
})
}

func TestCollection(t *testing.T) {
obj := Object{
"name": "Roman",
Expand Down Expand Up @@ -567,6 +450,8 @@ func TestInsertWithTTL(t *testing.T) {
})
}

// --------------------------- Mocks & Fixtures ----------------------------

// loadPlayers loads a list of players from the fixture
func loadPlayers(amount int) *Collection {
out := NewCollection(Options{
Expand Down Expand Up @@ -646,14 +531,3 @@ func loadFixture(name string) []Object {

return data
}

// noopWriter is a writer that simply counts the commits
type noopWriter struct {
commits uint64
}

// Write clones the commit and writes it into the writer
func (w *noopWriter) Write(commit commit.Commit) error {
atomic.AddUint64(&w.commits, 1)
return nil
}
6 changes: 6 additions & 0 deletions column.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Column interface {
Value(idx uint32) (interface{}, bool)
Contains(idx uint32) bool
Index() *bitmap.Bitmap
Snapshot(*commit.Buffer)
}

// Numeric represents a column that stores numbers.
Expand Down Expand Up @@ -247,6 +248,11 @@ func (c *columnBool) Index() *bitmap.Bitmap {
return &c.data
}

// Snapshot writes the entire column into the specified destination buffer
func (c *columnBool) Snapshot(dst *commit.Buffer) {
dst.PutBitmap(commit.PutTrue, c.data)
}

// --------------------------- funcs ----------------------------

// capacityFor computes the next power of 2 for a given index
Expand Down
7 changes: 7 additions & 0 deletions column_generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,13 @@ func (c *columnNumber) FilterUint64(offset uint32, index bitmap.Bitmap, predicat
})
}

// Snapshot writes the entire column into the specified destination buffer
func (c *columnNumber) Snapshot(dst *commit.Buffer) {
c.fill.Range(func(idx uint32) {
dst.PutNumber(commit.Put, idx, c.data[idx])
})
}

// --------------------------- Cursor Update ----------------------------

// SetNumber updates a column value for the current item. The actual operation
Expand Down
7 changes: 7 additions & 0 deletions column_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,13 @@ func (c *columnIndex) Index() *bitmap.Bitmap {
return &c.fill
}

// Snapshot writes the entire column into the specified destination buffer
func (c *columnIndex) Snapshot(dst *commit.Buffer) {
c.fill.Range(func(idx uint32) {
dst.PutOperation(commit.PutTrue, idx)
})
}

// --------------------------- Key ----------------------------

// columnKey represents the primary key column implementation
Expand Down
Loading