Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

int64 primary key #106

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,28 +379,28 @@ func (c *Collection) Close() error {
// --------------------------- Primary Key ----------------------------

// InsertKey inserts a row given its corresponding primary key.
func (c *Collection) InsertKey(key string, fn func(Row) error) error {
func (c *Collection) InsertKey(key int64, fn func(Row) error) error {
return c.Query(func(txn *Txn) error {
return txn.InsertKey(key, fn)
})
}

// UpsertKey inserts or updates a row given its corresponding primary key.
func (c *Collection) UpsertKey(key string, fn func(Row) error) error {
func (c *Collection) UpsertKey(key int64, fn func(Row) error) error {
return c.Query(func(txn *Txn) error {
return txn.UpsertKey(key, fn)
})
}

// QueryKey queries/updates a row given its corresponding primary key.
func (c *Collection) QueryKey(key string, fn func(Row) error) error {
func (c *Collection) QueryKey(key int64, fn func(Row) error) error {
return c.Query(func(txn *Txn) error {
return txn.QueryKey(key, fn)
})
}

// DeleteKey deletes a row for a given primary key.
func (c *Collection) DeleteKey(key string) error {
func (c *Collection) DeleteKey(key int64) error {
return c.Query(func(txn *Txn) error {
return txn.DeleteKey(key)
})
Expand Down
99 changes: 99 additions & 0 deletions column_keys.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright (c) Roman Atachiants and contributors. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for details.

package column

import (
"fmt"
"sync"

"github.com/kelindar/column/commit"
)

// --------------------------- Key ----------------------------

// columnKey represents the primary key column implementation
type columnKey struct {
numericColumn[int64]
name string // Name of the column
lock sync.RWMutex // Lock to protect the lookup table
seek map[int64]uint32 // Lookup table for O(1) index seek
}

// makeKey creates a new primary key column
func makeKey() Column {
col := makeInt64s().(*numericColumn[int64])
return &columnKey{
seek: make(map[int64]uint32, 64),
numericColumn: *col,
}
}

// Apply applies a set of operations to the column.
func (c *columnKey) Apply(chunk commit.Chunk, r *commit.Reader) {
fill, data := c.chunkAt(chunk)
from := chunk.Min()

for r.Next() {
offset := r.Offset - int32(from)
switch r.Type {
case commit.Put:
value := r.Int64()

fill[offset>>6] |= 1 << (offset & 0x3f)
data[offset] = value
c.lock.Lock()
c.seek[value] = uint32(r.Offset)
c.lock.Unlock()

case commit.Delete:
fill.Remove(uint32(offset))
c.lock.Lock()
delete(c.seek, data[offset])
c.lock.Unlock()
}
}
}

// OffsetOf returns the offset for a particular value
func (c *columnKey) OffsetOf(v int64) (uint32, bool) {
c.lock.RLock()
idx, ok := c.seek[v]
c.lock.RUnlock()
return idx, ok
}

// rwKey represents read-write accessor for primary keys.
type rwKey struct {
cursor *uint32
writer *commit.Buffer
reader *columnKey
}

// Set sets the value at the current transaction index
func (s rwKey) Set(value int64) error {
if _, ok := s.reader.OffsetOf(value); !ok {
s.writer.PutInt64(commit.Put, *s.cursor, value)
return nil
}

return fmt.Errorf("column: unable to set duplicate key '%d'", value)
}

// Get loads the value at the current transaction index
func (s rwKey) Get() (int64, bool) {
return s.reader.LoadInt64(*s.cursor)
}

// Enum returns a enumerable column accessor
func (txn *Txn) Key() rwKey {
if txn.owner.pk == nil {
panic(fmt.Errorf("column: primary key column does not exist"))
}

return rwKey{
cursor: &txn.cursor,
writer: txn.bufferFor(txn.owner.pk.name),
reader: txn.owner.pk,
}
}
91 changes: 0 additions & 91 deletions column_strings.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@
package column

import (
"fmt"
"math"
"sync"

"github.com/kelindar/bitmap"
"github.com/kelindar/column/commit"
Expand Down Expand Up @@ -256,95 +254,6 @@ func (txn *Txn) String(columnName string) rwString {
}
}

// --------------------------- Key ----------------------------

// columnKey represents the primary key column implementation
type columnKey struct {
columnString
name string // Name of the column
lock sync.RWMutex // Lock to protect the lookup table
seek map[string]uint32 // Lookup table for O(1) index seek
}

// makeKey creates a new primary key column
func makeKey() Column {
return &columnKey{
seek: make(map[string]uint32, 64),
columnString: columnString{
chunks: make(chunks[string], 0, 4),
},
}
}

// Apply applies a set of operations to the column.
func (c *columnKey) Apply(chunk commit.Chunk, r *commit.Reader) {
fill, data := c.chunkAt(chunk)
from := chunk.Min()

for r.Next() {
offset := r.Offset - int32(from)
switch r.Type {
case commit.Put:
value := string(r.Bytes())

fill[offset>>6] |= 1 << (offset & 0x3f)
data[offset] = value
c.lock.Lock()
c.seek[value] = uint32(r.Offset)
c.lock.Unlock()

case commit.Delete:
fill.Remove(uint32(offset))
c.lock.Lock()
delete(c.seek, string(data[offset]))
c.lock.Unlock()
}
}
}

// OffsetOf returns the offset for a particular value
func (c *columnKey) OffsetOf(v string) (uint32, bool) {
c.lock.RLock()
idx, ok := c.seek[v]
c.lock.RUnlock()
return idx, ok
}

// rwKey represents read-write accessor for primary keys.
type rwKey struct {
cursor *uint32
writer *commit.Buffer
reader *columnKey
}

// Set sets the value at the current transaction index
func (s rwKey) Set(value string) error {
if _, ok := s.reader.OffsetOf(value); !ok {
s.writer.PutString(commit.Put, *s.cursor, value)
return nil
}

return fmt.Errorf("column: unable to set duplicate key '%s'", value)
}

// Get loads the value at the current transaction index
func (s rwKey) Get() (string, bool) {
return s.reader.LoadString(*s.cursor)
}

// Enum returns a enumerable column accessor
func (txn *Txn) Key() rwKey {
if txn.owner.pk == nil {
panic(fmt.Errorf("column: primary key column does not exist"))
}

return rwKey{
cursor: &txn.cursor,
writer: txn.bufferFor(txn.owner.pk.name),
reader: txn.owner.pk,
}
}

// --------------------------- Reader ----------------------------

// rdString represents a read-only accessor for strings
Expand Down
32 changes: 17 additions & 15 deletions column_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,15 +255,15 @@ func TestForKindInvalid(t *testing.T) {
}

func TestAtKey(t *testing.T) {
const testKey = "key=20"
const testKey = 20

// Update a name
players := loadPlayers(500)
players.CreateColumn("pk", ForKey())
assert.NoError(t, players.Query(func(txn *Txn) error {
pk := txn.Key()
return txn.Range(func(idx uint32) {
pk.Set(fmt.Sprintf("key=%d", idx))
pk.Set(int64(idx))
})
}))

Expand All @@ -290,31 +290,31 @@ func TestAtKey(t *testing.T) {

func TestUpdateAtKeyWithoutPK(t *testing.T) {
col := NewCollection()
assert.Error(t, col.QueryKey("test", func(r Row) error {
assert.Error(t, col.QueryKey(1234, func(r Row) error {
r.SetEnum("name", "Roman")
return nil
}))
}

func TestSelectAtKeyWithoutPK(t *testing.T) {
col := NewCollection()
assert.Error(t, col.QueryKey("test", func(r Row) error { return nil }))
assert.Error(t, col.InsertKey("test", func(r Row) error { return nil }))
assert.Error(t, col.UpsertKey("test", func(r Row) error { return nil }))
assert.Error(t, col.DeleteKey("test"))
assert.Error(t, col.QueryKey(1234, func(r Row) error { return nil }))
assert.Error(t, col.InsertKey(1234, func(r Row) error { return nil }))
assert.Error(t, col.UpsertKey(1234, func(r Row) error { return nil }))
assert.Error(t, col.DeleteKey(1234))
}

func TestBulkUpdateDuplicatePK(t *testing.T) {
col := NewCollection()
col.CreateColumn("key", ForKey())
assert.NoError(t, col.InsertKey("1", func(r Row) error { return nil }))
assert.NoError(t, col.InsertKey("2", func(r Row) error { return nil }))
assert.NoError(t, col.InsertKey(1, func(r Row) error { return nil }))
assert.NoError(t, col.InsertKey(2, func(r Row) error { return nil }))

// If we attempt to change to an already persisted key, we should get an error
assert.NoError(t, col.Query(func(txn *Txn) error {
pk := txn.Key()
assert.Error(t, txn.QueryKey("1", func(Row) error {
return pk.Set("2")
assert.Error(t, txn.QueryKey(1, func(Row) error {
return pk.Set(2)
}))
return nil
}))
Expand Down Expand Up @@ -489,8 +489,9 @@ func TestPKAccessor(t *testing.T) {
col := NewCollection()
assert.NoError(t, col.CreateColumn("name", ForKey()))

const Roman = 1234
// Insert a primary key value
err := col.InsertKey("Roman", func(r Row) error {
err := col.InsertKey(Roman, func(r Row) error {
return nil
})
assert.NoError(t, err)
Expand All @@ -499,7 +500,7 @@ func TestPKAccessor(t *testing.T) {
col.QueryAt(0, func(r Row) error {
value, ok := r.txn.Key().Get()
assert.True(t, ok)
assert.Equal(t, "Roman", value)
assert.Equal(t, Roman, value)
return nil
})
}
Expand Down Expand Up @@ -529,13 +530,14 @@ func TestDuplicatePK(t *testing.T) {
col := NewCollection()
assert.NoError(t, col.CreateColumn("name", ForKey()))

const Roman = 1234
// Insert a primary key value
assert.NoError(t, col.InsertKey("Roman", func(r Row) error {
assert.NoError(t, col.InsertKey(Roman, func(r Row) error {
return nil
}))

// Insert a duplicate
assert.Error(t, col.InsertKey("Roman", func(r Row) error {
assert.Error(t, col.InsertKey(Roman, func(r Row) error {
return nil
}))

Expand Down
7 changes: 4 additions & 3 deletions examples/bench/bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package main
import (
"context"
"fmt"
"runtime"
"sync"
"sync/atomic"
"time"
Expand All @@ -23,7 +24,7 @@ var (
)

func main() {
amount := 1000000
amount := 1_000_000
players := column.NewCollection(column.Options{
Capacity: amount,
})
Expand Down Expand Up @@ -59,10 +60,10 @@ func main() {
func runBenchmark(name string, fn func(bool) (int, int)) {
fmt.Printf("Benchmarking %v ...\n", name)
fmt.Printf("%7v\t%6v\t%17v\t%13v\n", "WORK", "PROCS", "READ RATE", "WRITE RATE")
for _, workload := range []int{0, 10, 50, 90, 100} {
for _, workload := range []int{0, 10, 25, 50, 75, 90, 100} {

// Iterate over various concurrency levels
for _, n := range []int{1, 2, 4, 8, 16, 32, 64, 128, 256, 512} {
for n := 1; n <= runtime.NumCPU()*2; n *= 2 {
work := make(chan async.Task, n)
pool := async.Consume(context.Background(), n, work)

Expand Down
4 changes: 2 additions & 2 deletions examples/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func New() *Cache {
}

// Get attempts to retrieve a value for a key
func (c *Cache) Get(key string) (value string, found bool) {
func (c *Cache) Get(key int64) (value string, found bool) {
c.store.QueryKey(key, func(r column.Row) error {
value, found = r.String("val")
return nil
Expand All @@ -33,7 +33,7 @@ func (c *Cache) Get(key string) (value string, found bool) {
}

// Set updates or inserts a new value
func (c *Cache) Set(key, value string) {
func (c *Cache) Set(key int64, value string) {
if err := c.store.UpsertKey(key, func(r column.Row) error {
r.SetString("val", value)
return nil
Expand Down
Loading