From 3e795a13a0f1217859fa4ff6efc8b1e00753a101 Mon Sep 17 00:00:00 2001 From: William Dreese Date: Sat, 17 Dec 2022 03:08:33 -0500 Subject: [PATCH] SortedIndex baseline implementation (#75) This PR introduces a new Sorted Index that keeps an actively sorted b-tree (github.com/tidwall/btree) for a column of the user's choosing (currently limited to string-type only). The index holds one b-tree that is not copied between transactions (mutexed). Future work would consider other type columns being sorted (currently only string columns), PK sorting, and custom `Less()` functionality for users. --- README.md | 25 +++++++ collection.go | 40 +++++++++++ column_index.go | 101 ++++++++++++++++++++++++-- commit/reader.go | 2 +- go.mod | 3 + go.sum | 6 ++ txn.go | 29 ++++++++ txn_test.go | 179 +++++++++++++++++++++++++++++++++++++++++++++++ 8 files changed, 379 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index b1d501b..0ad2c45 100644 --- a/README.md +++ b/README.md @@ -205,6 +205,31 @@ players.Query(func(txn *column.Txn) error { }) ``` +## Sorted Indexes + +Along with bitmap indexing, collections support consistently sorted indexes. These indexes are transient, and must be recreated when a collection is loading a snapshot. + +In the example below, we create a SortedIndex object and use it to sort filtered records in a transaction. + +```go +// Create the sorted index "sortedNames" in advance +out.CreateSortIndex("richest", "balance") + +// This filters the transaction with the `rouge` index before +// ranging through the remaining balances in ascending order +players.Query(func(txn *column.Txn) error { + name := txn.String("name") + balance := txn.Float64("balance") + + txn.With("rogue").Ascend("richest", func (i uint32) { + // save or do something with sorted record + curName, _ := name.Get() + balance.Set(newBalance(curName)) + }) + return nil +}) +``` + ## Updating Values In order to update certain items in the collection, you can simply call `Range()` method and use column accessor's `Set()` or `Add()` methods to update a value of a certain column atomically. The updates won't be instantly reflected given that our store supports transactions. Only when transaction is commited, then the update will be applied to the collection, allowing for isolation and rollbacks. diff --git a/collection.go b/collection.go index cf187d3..028fd3a 100644 --- a/collection.go +++ b/collection.go @@ -275,6 +275,46 @@ func (c *Collection) CreateIndex(indexName, columnName string, fn func(r Reader) return nil } +func (c *Collection) CreateSortIndex(indexName, columnName string) error { + if columnName == "" || indexName == "" { + return fmt.Errorf("column: create index must specify name & column") + } + + // Prior to creating an index, we should have a column + column, ok := c.cols.Load(columnName) + if !ok { + return fmt.Errorf("column: unable to create index, column '%v' does not exist", columnName) + } + + // Check to make sure index does not already exist + _, ok = c.cols.Load(indexName) + if ok { + return fmt.Errorf("column: unable to create index, index '%v' already exist", indexName) + } + + // Create and add the index column, + index := newSortIndex(indexName, columnName) + c.lock.Lock() + // index.Grow(uint32(c.opts.Capacity)) + c.cols.Store(indexName, index) + c.cols.Store(columnName, column, index) + c.lock.Unlock() + + // Iterate over all of the values of the target column, chunk by chunk and fill + // the index accordingly. + chunks := c.chunks() + buffer := commit.NewBuffer(c.Count()) + reader := commit.NewReader() + for chunk := commit.Chunk(0); int(chunk) < chunks; chunk++ { + if column.Snapshot(chunk, buffer) { + reader.Seek(buffer) + index.Apply(chunk, reader) + } + } + + return nil +} + // DropIndex removes the index column with the specified name. If the index with this // name does not exist, this operation is a no-op. func (c *Collection) DropIndex(indexName string) error { diff --git a/column_index.go b/column_index.go index 0b61f34..26eb091 100644 --- a/column_index.go +++ b/column_index.go @@ -4,8 +4,11 @@ package column import ( + "strings" "github.com/kelindar/bitmap" "github.com/kelindar/column/commit" + + "github.com/tidwall/btree" ) // --------------------------- Reader --------------------------- @@ -119,16 +122,16 @@ func newTrigger(indexName, columnName string, callback func(r Reader)) *column { }) } -// Grow grows the size of the column until we have enough to store -func (c *columnTrigger) Grow(idx uint32) { - // Noop -} - // Column returns the target name of the column on which this index should apply. func (c *columnTrigger) Column() string { return c.name } +// Grow grows the size of the column until we have enough to store +func (c *columnTrigger) Grow(idx uint32) { + // Noop +} + // Apply applies a set of operations to the column. func (c *columnTrigger) Apply(chunk commit.Chunk, r *commit.Reader) { for r.Next() { @@ -157,3 +160,91 @@ func (c *columnTrigger) Index(chunk commit.Chunk) bitmap.Bitmap { func (c *columnTrigger) Snapshot(chunk commit.Chunk, dst *commit.Buffer) { // Noop } + +// ----------------------- Sorted Index -------------------------- + +type SortIndexItem struct { + Key string + Value uint32 +} + +// columnSortIndex implements a constantly sorted column via BTree +type columnSortIndex struct { + btree *btree.BTreeG[SortIndexItem] // 1 constantly sorted data structure + backMap map[uint32]string // for constant key lookups + name string // The name of the target column +} + +// newSortIndex creates a new bitmap index column. +func newSortIndex(indexName, columnName string) *column { + byKeys := func (a, b SortIndexItem) bool { + return a.Key < b.Key + } + return columnFor(indexName, &columnSortIndex{ + btree: btree.NewBTreeG[SortIndexItem](byKeys), + backMap: make(map[uint32]string), + name: columnName, + }) +} + +// Grow grows the size of the column until we have enough to store +func (c *columnSortIndex) Grow(idx uint32) { + return +} + +// Column returns the target name of the column on which this index should apply. +func (c *columnSortIndex) Column() string { + return c.name +} + + +// Apply applies a set of operations to the column. +func (c *columnSortIndex) Apply(chunk commit.Chunk, r *commit.Reader) { + + // Index can only be updated based on the final stored value, so we can only work + // with put, merge, & delete operations here. + for r.Next() { + switch r.Type { + case commit.Put: + if delKey, exists := c.backMap[r.Index()]; exists { + c.btree.Delete(SortIndexItem{ + Key: delKey, + Value: r.Index(), + }) + } + upsertKey := strings.Clone(r.String()) // alloc required + c.backMap[r.Index()] = upsertKey + c.btree.Set(SortIndexItem{ + Key: upsertKey, + Value: r.Index(), + }) + case commit.Delete: + delKey, _ := c.backMap[r.Index()] + c.btree.Delete(SortIndexItem{ + Key: delKey, + Value: r.Index(), + }) + } + } +} + + +// Value retrieves a value at a specified index. +func (c *columnSortIndex) Value(idx uint32) (v interface{}, ok bool) { + return nil, false +} + +// Contains checks whether the column has a value at a specified index. +func (c *columnSortIndex) Contains(idx uint32) bool { + return false +} + +// Index returns the fill list for the column +func (c *columnSortIndex) Index(chunk commit.Chunk) bitmap.Bitmap { + return nil +} + +// Snapshot writes the entire column into the specified destination buffer +func (c *columnSortIndex) Snapshot(chunk commit.Chunk, dst *commit.Buffer) { + // No-op +} diff --git a/commit/reader.go b/commit/reader.go index b0196eb..a07d74c 100644 --- a/commit/reader.go +++ b/commit/reader.go @@ -39,7 +39,7 @@ func (r *Reader) Rewind() { r.Offset = r.start } -// Use sets the buffer and resets the reader. +// use sets the buffer and resets the reader. func (r *Reader) use(buffer []byte) { r.buffer = buffer r.headString = 0 diff --git a/go.mod b/go.mod index 6054b3a..6128ec7 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/kelindar/column go 1.19 require ( + github.com/imdario/mergo v0.3.13 github.com/kelindar/bitmap v1.4.1 github.com/kelindar/intmap v1.1.0 github.com/kelindar/iostream v1.3.0 @@ -13,6 +14,8 @@ require ( github.com/zeebo/xxh3 v1.0.2 ) +require github.com/tidwall/btree v1.6.0 // indirect + require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/dustin/go-humanize v1.0.0 diff --git a/go.sum b/go.sum index 3319c6b..efcf7ca 100644 --- a/go.sum +++ b/go.sum @@ -3,6 +3,7 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= +github.com/imdario/mergo v0.3.13/go.mod h1:4lJ1jqUDcsbIECGy0RUJAXNIhg+6ocWgb1ALK2O4oXg= github.com/kelindar/async v1.0.0 h1:oJiFAt3fVB/b5zVZKPBU+pP9lR3JVyeox9pYlpdnIK8= github.com/kelindar/async v1.0.0/go.mod h1:bJRlwaRiqdHi+4dpVDNHdwgyRyk6TxpA21fByLf7hIY= github.com/kelindar/bitmap v1.4.1 h1:Ih0BWMYXkkZxPMU536DsQKRhdvqFl7tuNjImfLJWC6E= @@ -31,6 +32,10 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/tidwall/btree v1.5.2 h1:5eA83Gfki799V3d3bJo9sWk+yL2LRoTEah3O/SA6/8w= +github.com/tidwall/btree v1.5.2/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY= +github.com/tidwall/btree v1.6.0 h1:LDZfKfQIBHGHWSwckhXI0RPSXzlo+KYdjK7FWSqOzzg= +github.com/tidwall/btree v1.6.0/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY= github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= @@ -43,5 +48,6 @@ golang.org/x/time v0.2.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/txn.go b/txn.go index cbea8f1..85ccc13 100644 --- a/txn.go +++ b/txn.go @@ -388,6 +388,35 @@ func (txn *Txn) Range(fn func(idx uint32)) error { return nil } +// Ascend through a given SortedIndex and returns each offset +// remaining in the transaction's index +func (txn *Txn) Ascend(sortIndexName string, fn func(idx uint32)) error { + txn.initialize() + txn.owner.lock.RLock() // protect against writes on btree + defer txn.owner.lock.RUnlock() + // lock := txn.owner.slock + + sortIndex, ok := txn.owner.cols.Load(sortIndexName) + if !ok { + return fmt.Errorf("column: no sorted index named '%v'", sortIndexName) + } + + // For each btree key, check if the offset is still in + // the txn's index & return if true + sortIndexCol, _ := sortIndex.Column.(*columnSortIndex) + sortIndexCol.btree.Scan(func (item SortIndexItem) bool { + if txn.index.Contains(item.Value) { + // chunk := commit.ChunkAt(item.Value) + // lock.RLock(uint(chunk)) + txn.cursor = item.Value + fn(item.Value) + // lock.RUnlock(uint(chunk)) + } + return true + }) + return nil +} + // DeleteAll marks all of the items currently selected by this transaction for deletion. The // actual delete will take place once the transaction is committed. func (txn *Txn) DeleteAll() { diff --git a/txn_test.go b/txn_test.go index 185cb86..f41502d 100644 --- a/txn_test.go +++ b/txn_test.go @@ -7,7 +7,9 @@ import ( "fmt" "sync" "testing" + "strconv" + "github.com/kelindar/xxrand" "github.com/kelindar/column/commit" "github.com/stretchr/testify/assert" ) @@ -259,6 +261,183 @@ func TestIndexed(t *testing.T) { }) } +func TestSortIndex(t *testing.T) { + c := NewCollection() + c.CreateColumn("col1", ForString()) + c.CreateSortIndex("sortedCol1", "col1") + + assert.Error(t, c.CreateSortIndex("", "")) + assert.Error(t, c.CreateSortIndex("no_col", "nonexistent")) + assert.Error(t, c.CreateSortIndex("sortedCol1", "col1")) + + indexCol, _ := c.cols.Load("sortedCol1") + assert.Equal(t, "col1", indexCol.Column.(*columnSortIndex).Column()) + assert.False(t, indexCol.Column.(*columnSortIndex).Contains(0)) + assert.Nil(t, indexCol.Column.(*columnSortIndex).Index(0)) + v, ok := indexCol.Column.(*columnSortIndex).Value(0) + assert.Nil(t, v) + assert.False(t, ok) + assert.NotPanics(t, func() { + indexCol.Column.(*columnSortIndex).Grow(100) + indexCol.Column.(*columnSortIndex).Snapshot(0, nil) + }) + + // Inserts + c.Insert(func (r Row) error { + r.SetString("col1", "bob") + return nil + }) + c.Insert(func (r Row) error { + r.SetString("col1", "carter") + return nil + }) + c.Insert(func (r Row) error { + r.SetString("col1", "dan") + return nil + }) + c.Insert(func (r Row) error { + r.SetString("col1", "alice") + return nil + }) + + // Update + assert.NoError(t, c.QueryAt(3, func(r Row) error { + r.SetString("col1", "rob") + return nil + })) + assert.Equal(t, 4, indexCol.Column.(*columnSortIndex).btree.Len()) + + // Delete + assert.Equal(t, true, c.DeleteAt(1)) + assert.Equal(t, 3, indexCol.Column.(*columnSortIndex).btree.Len()) + + // Range + assert.Error(t, c.Query(func (txn *Txn) error { + return txn.Ascend("nonexistent", func (i uint32) { + return + }) + })) + + var res [3]string + var resN int = 0 + c.Query(func (txn *Txn) error { + col1 := txn.String("col1") + return txn.Ascend("sortedCol1", func (i uint32) { + name, _ := col1.Get() + res[resN] = name + resN++ + }) + }) + + assert.Equal(t, "bob", res[0]) + assert.Equal(t, "dan", res[1]) + assert.Equal(t, "rob", res[2]) +} + +func TestSortIndexLoad(t *testing.T) { + + players := loadPlayers(500) + players.CreateSortIndex("sorted_names", "name") + + checkN := 0 + checks := map[int]string{ + 4: "Buckner Frazier", + 16: "Marla Todd", + 30: "Shelly Kirk", + 35: "out of range", + } + + players.Query(func (txn *Txn) error { + txn = txn.With("human", "mage") + name := txn.String("name") + txn.Ascend("sorted_names", func (i uint32) { + n, _ := name.Get() + if res, exists := checks[checkN]; exists { + assert.Equal(t, res, n) + } + checkN++ + }) + return nil + }) + +} + +func TestSortIndexChunks(t *testing.T) { + N := 100_000 + obj := map[string]any{ + "name": "1", + "balance": 12.5, + } + + players := NewCollection() + players.CreateColumnsOf(obj) + players.CreateSortIndex("sorted_names", "name") + + for i := 0; i < N; i++ { + players.Insert(func (r Row) error { + return r.SetMany(map[string]any{ + "name": strconv.Itoa(i), + "balance": float64(i) + 0.5, + }) + }) + } + + players.Query(func (txn *Txn) error { + name := txn.String("name") + txn.Ascend("sorted_names", func (i uint32) { + n, _ := name.Get() + if i % 400 == 0 { + nInt, _ := strconv.Atoi(n) + assert.Equal(t, nInt, int(i)) + } + }) + return nil + }) + + // Concurrency Test + var wg sync.WaitGroup + order := new(sync.WaitGroup) + wg.Add(2) + order.Add(1) + + // Do the same test as before at the same time as other updates + go func() { + players.Query(func (txn *Txn) error { + name := txn.String("name") + order.Done() // Ensure this txn begins before update + txn.Ascend("sorted_names", func (i uint32) { + n, _ := name.Get() + if i % 400 == 0 { + nInt, _ := strconv.Atoi(n) + assert.Equal(t, nInt, int(i)) + } + }) + return nil + }) + wg.Done() + }() + + go func() { + order.Wait() // Wait for scan to begin + idx1 := xxrand.Uint32n(uint32(N / 400)) * 400 // hit checked idxs only + idx2 := xxrand.Uint32n(uint32(N / 400)) * 400 + players.Insert(func (r Row) error { + r.SetString("name", "new") + r.SetFloat64("balance", 43.2) + return nil + }) + players.QueryAt(idx1, func (r Row) error { + r.SetString("name", "updated") + return nil + }) + players.DeleteAt(idx2) + wg.Done() + }() + + wg.Wait() + assert.Equal(t, 100_000, players.Count()) +} + func TestDeleteAll(t *testing.T) { players := loadPlayers(500) assert.Equal(t, 500, players.Count())