Skip to content

Commit

Permalink
Found race case with backMap in SortedIndex (#84)
Browse files Browse the repository at this point in the history
  • Loading branch information
Dreeseaw authored Dec 24, 2022
1 parent 7c1c66e commit fd7111f
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 4 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ In the example below, we create a SortedIndex object and use it to sort filtered
out.CreateSortIndex("richest", "balance")

// This filters the transaction with the `rouge` index before
// ranging through the remaining balances in ascending order
// ranging through the remaining balances by ascending order
players.Query(func(txn *column.Txn) error {
name := txn.String("name")
balance := txn.Float64("balance")
Expand Down
89 changes: 89 additions & 0 deletions collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"io"
"runtime"
"strconv"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -591,6 +592,94 @@ func TestInsertParallel(t *testing.T) {
}))
}

func BenchmarkParallelSort(b *testing.B) {
getobj := func(n string) map[string]any {
return map[string]any{
"name": n,
"age": 35,
"wallet": 50.99,
"health": 100,
"mana": 200,
}
}

b.Run("in-asc", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
col := NewCollection()
col.CreateColumnsOf(getobj("n"))
col.CreateSortIndex("sorted_name", "name")
var wg sync.WaitGroup
wg.Add(20)
for i := 0; i < 20; i++ {
go func(ii int) {
for x := 0; x < 5000; x++ {
tobj := getobj("n")
tobj["name"] = strconv.Itoa((ii * 20) + x)
col.Insert(func(r Row) error {
return r.SetMany(tobj)
})
}
wg.Done()
}(i)
go func(ii int) {
for y := 0; y < 5; y++ {
col.Query(func(txn *Txn) error {
health := txn.Int("health")
return txn.Ascend("sorted_name", func(i uint32) {
health.Set((ii * 20) + y)
})
})
}
}(i)
}
wg.Wait()
}
})
}

func TestParallelSort(t *testing.T) {
getobj := func(n string) map[string]any {
return map[string]any{
"name": n,
"age": 35,
"wallet": 50.99,
"health": 100,
"mana": 200,
}
}

col := NewCollection()
col.CreateColumnsOf(getobj("n"))
col.CreateSortIndex("sorted_name", "name")

var wg sync.WaitGroup
wg.Add(20)
for i := 0; i < 20; i++ {
go func(ii int) {
for x := 0; x < 5000; x++ {
tobj := getobj("n")
tobj["name"] = strconv.Itoa((ii * 20) + x)
col.Insert(func(r Row) error {
return r.SetMany(tobj)
})
}
wg.Done()
}(i)
go func(ii int) {
col.Query(func(txn *Txn) error {
health := txn.Int("health")
return txn.Ascend("sorted_name", func(i uint32) {
health.Set(ii)
})
})
}(i)
}
wg.Wait()
assert.Equal(t, 100_000, col.Count())
}

func TestConcurrentPointReads(t *testing.T) {
obj := map[string]any{
"name": "Roman",
Expand Down
10 changes: 7 additions & 3 deletions column_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package column

import (
"strings"
"sync"

"github.com/kelindar/bitmap"
"github.com/kelindar/column/commit"
Expand Down Expand Up @@ -171,9 +172,10 @@ type sortIndexItem struct {

// columnSortIndex implements a constantly sorted column via BTree
type columnSortIndex struct {
btree *btree.BTreeG[sortIndexItem] // 1 constantly sorted data structure
backMap map[uint32]string // for constant key lookups
name string // The name of the target column
btree *btree.BTreeG[sortIndexItem] // 1 constantly sorted data structure
backMap map[uint32]string // for constant key lookups
backLock sync.Mutex // protect backMap access
name string // The name of the target column
}

// newSortIndex creates a new bitmap index column.
Expand Down Expand Up @@ -204,6 +206,7 @@ func (c *columnSortIndex) Apply(chunk commit.Chunk, r *commit.Reader) {
// Index can only be updated based on the final stored value, so we can only work
// with put, merge, & delete operations here.
for r.Next() {
c.backLock.Lock()
switch r.Type {
case commit.Put:
if delKey, exists := c.backMap[r.Index()]; exists {
Expand All @@ -225,6 +228,7 @@ func (c *columnSortIndex) Apply(chunk commit.Chunk, r *commit.Reader) {
Value: r.Index(),
})
}
c.backLock.Unlock()
}
}

Expand Down

0 comments on commit fd7111f

Please sign in to comment.