Skip to content

Commit

Permalink
refactor: Strongly typed key refactor (sourcenetwork#17)
Browse files Browse the repository at this point in the history
* Reference correct Key type in documentation

* Remove commented out import

* Remove commented out import

* Remove commented out code

* Remove commented out import

* Replace references to ds.Key with core.Key

* Replace ds.Key with structured data type

* Remove duplicate call

WithPriorityFlag is called within reg.getPriority and should not be the responsibility of the caller

* Remove DataStoreKey from Collection struct

Is not needed and complicates the code

* Remove duplicate call

WithValueFlag is called in writeObjectMarker

* Take docKey string as param instead of DataStoreKey

Is wasteful taking the whole struct (and initing one in most cases)

* Documet commit key changes
  • Loading branch information
AndrewSisley authored Mar 4, 2022
1 parent 59c9f91 commit 68783bb
Show file tree
Hide file tree
Showing 58 changed files with 1,018 additions and 646 deletions.
2 changes: 1 addition & 1 deletion client/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ type Collection interface {

WithTxn(core.Txn) Collection

GetPrimaryIndexDocKey(ds.Key) ds.Key
GetPrimaryIndexDocKey(core.DataStoreKey) core.DataStoreKey
GetAllDocKeys(ctx context.Context) (<-chan DocKeysResult, error)
}

Expand Down
57 changes: 12 additions & 45 deletions core/crdt/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,71 +19,38 @@ import (
"github.com/sourcenetwork/defradb/core"
)

var (
keysNs = "k" // /keys namespace /set/k/<key>/{v,p}
valueSuffix = "v" // value key
prioritySuffix = "p" // priority key
)

// baseCRDT is embedded as a base layer into all
// the core CRDT implementations to reduce code
// duplication, and better manage the overhead
// tasks that all the CRDTs need to implement anyway
type baseCRDT struct {
store core.DSReaderWriter
namespace ds.Key
keysNs string
valueSuffix string
prioritySuffix string
store core.DSReaderWriter
key core.DataStoreKey
}

// @todo paramaterize ns/suffix
func newBaseCRDT(store core.DSReaderWriter, namespace ds.Key) baseCRDT {
// @TODO paramaterize ns/suffix
func newBaseCRDT(store core.DSReaderWriter, key core.DataStoreKey) baseCRDT {
return baseCRDT{
store: store,
namespace: namespace,
keysNs: keysNs,
valueSuffix: valueSuffix,
prioritySuffix: prioritySuffix,
store: store,
key: key,
}
}

func (base baseCRDT) keyPrefix(key string) ds.Key {
return base.namespace.ChildString(key)
}

func (base baseCRDT) valueKey(key string) ds.Key {
return base.namespace.ChildString(key).Instance(base.valueSuffix)
}

func (base baseCRDT) priorityKey(key string) ds.Key {
return base.namespace.ChildString(key).Instance(base.prioritySuffix)
}

// Commented because this function is unused (for linter).
// func (base baseCRDT) typeKey(key string) ds.Key {
// return base.namespace.ChildString(key).Instance(crdtTypeSuffix)
// }

// func (base baseCRDT) dataTypeKey(key string) ds.Key {
// return base.namespace.ChildString(key).Instance(dataTypeSuffix)
// }

func (base baseCRDT) setPriority(ctx context.Context, key string, priority uint64) error {
prioK := base.priorityKey(key)
func (base baseCRDT) setPriority(ctx context.Context, key core.DataStoreKey, priority uint64) error {
prioK := key.WithPriorityFlag()
buf := make([]byte, binary.MaxVarintLen64)
n := binary.PutUvarint(buf, priority+1)
if n == 0 {
return errors.New("error encoding priority")
}

return base.store.Put(ctx, prioK, buf[0:n])
return base.store.Put(ctx, prioK.ToDS(), buf[0:n])
}

// get the current priority for given key
func (base baseCRDT) getPriority(ctx context.Context, key string) (uint64, error) {
pKey := base.priorityKey(key)
pbuf, err := base.store.Get(ctx, pKey)
func (base baseCRDT) getPriority(ctx context.Context, key core.DataStoreKey) (uint64, error) {
pKey := key.WithPriorityFlag()
pbuf, err := base.store.Get(ctx, pKey.ToDS())
if err != nil {
if err == ds.ErrNotFound {
return 0, nil
Expand Down
36 changes: 10 additions & 26 deletions core/crdt/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,58 +28,42 @@ func newSeededDS() core.DSReaderWriter {
}

func exampleBaseCRDT() baseCRDT {
return newBaseCRDT(newSeededDS(), ds.NewKey("test"))
return newBaseCRDT(newSeededDS(), core.DataStoreKey{})
}

func TestBaseCRDTNew(t *testing.T) {
base := newBaseCRDT(newDS(), ds.NewKey("test"))
base := newBaseCRDT(newDS(), core.DataStoreKey{})
if base.store == nil {
t.Error("newBaseCRDT needs to init store")
} else if base.namespace.String() == "" {
t.Error("newBaseCRDT needs to init namespace")
} else if base.keysNs == "" {
t.Error("newBaseCRDT needs to init KeyNS")
} else if base.valueSuffix == "" {
t.Error("newBaseCRDT needs to init valueSuffix")
} else if base.prioritySuffix == "" {
t.Error("newBaseCRDT needs to init prioritySuffix")
}
}

func TestBaseCRDTKeyPrefix(t *testing.T) {
base := exampleBaseCRDT()
kp := base.keyPrefix("key1")
if kp.String() != "/test/key1" {
t.Errorf("Incorrect keyPrefix. Have %v, want %v", kp.String(), "/test/key1")
}
}

func TestBaseCRDTvalueKey(t *testing.T) {
base := exampleBaseCRDT()
vk := base.valueKey("mykey")
if vk.String() != "/test/mykey:v" {
t.Errorf("Incorrect valueKey. Have %v, want %v", vk.String(), "/test/k/mykey/v")
vk := base.key.WithDocKey("mykey").WithValueFlag()
if vk.ToString() != "/mykey:v" {
t.Errorf("Incorrect valueKey. Have %v, want %v", vk.ToString(), "/mykey:v")
}
}

func TestBaseCRDTprioryKey(t *testing.T) {
base := exampleBaseCRDT()
pk := base.priorityKey("mykey")
if pk.String() != "/test/mykey:p" {
t.Errorf("Incorrect priorityKey. Have %v, want %v", pk.String(), "/test/k/mykey/p")
pk := base.key.WithDocKey("mykey").WithPriorityFlag()
if pk.ToString() != "/mykey:p" {
t.Errorf("Incorrect priorityKey. Have %v, want %v", pk.ToString(), "/mykey:p")
}
}

func TestBaseCRDTSetGetPriority(t *testing.T) {
base := exampleBaseCRDT()
ctx := context.Background()
err := base.setPriority(ctx, "mykey", 10)
err := base.setPriority(ctx, base.key.WithDocKey("mykey"), 10)
if err != nil {
t.Errorf("baseCRDT failed to set Priority. err: %v", err)
return
}

priority, err := base.getPriority(ctx, "mykey")
priority, err := base.getPriority(ctx, base.key.WithDocKey("mykey"))
if err != nil {
t.Errorf("baseCRDT failed to get priority. err: %v", err)
return
Expand Down
3 changes: 1 addition & 2 deletions core/crdt/composite.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (

"github.com/sourcenetwork/defradb/core"

ds "github.com/ipfs/go-datastore"
ipld "github.com/ipfs/go-ipld-format"
dag "github.com/ipfs/go-merkledag"
"github.com/ugorji/go/codec"
Expand Down Expand Up @@ -81,7 +80,7 @@ type CompositeDAG struct {
schemaID string
}

func NewCompositeDAG(store core.DSReaderWriter, schemaID string, namespace ds.Key, key string) CompositeDAG {
func NewCompositeDAG(store core.DSReaderWriter, schemaID string, namespace core.Key, key string) CompositeDAG {
return CompositeDAG{
key: key,
schemaID: schemaID,
Expand Down
21 changes: 9 additions & 12 deletions core/crdt/lwwreg.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"github.com/sourcenetwork/defradb/core"

ds "github.com/ipfs/go-datastore"
ipld "github.com/ipfs/go-ipld-format"
dag "github.com/ipfs/go-merkledag"

Expand Down Expand Up @@ -78,14 +77,12 @@ func (delta *LWWRegDelta) Value() interface{} {
// arbitrary data type that ensures convergence
type LWWRegister struct {
baseCRDT
key string
}

// NewLWWRegister returns a new instance of the LWWReg with the given ID
func NewLWWRegister(store core.DSReaderWriter, namespace ds.Key, key string) LWWRegister {
func NewLWWRegister(store core.DSReaderWriter, key core.DataStoreKey) LWWRegister {
return LWWRegister{
baseCRDT: newBaseCRDT(store, namespace),
key: key,
baseCRDT: newBaseCRDT(store, key),
// id: id,
// data: data,
// ts: ts,
Expand All @@ -96,8 +93,8 @@ func NewLWWRegister(store core.DSReaderWriter, namespace ds.Key, key string) LWW
// Value gets the current register value
// RETURN STATE
func (reg LWWRegister) Value(ctx context.Context) ([]byte, error) {
valueK := reg.valueKey(reg.key)
buf, err := reg.store.Get(ctx, valueK)
valueK := reg.key.WithValueFlag()
buf, err := reg.store.Get(ctx, valueK.ToDS())
if err != nil {
return nil, err
}
Expand All @@ -112,12 +109,12 @@ func (reg LWWRegister) Set(value []byte) *LWWRegDelta {
// return NewLWWRegister(reg.id, value, reg.clock.Apply(), reg.clock)
return &LWWRegDelta{
Data: value,
DocKey: []byte(reg.key),
DocKey: reg.key.Bytes(),
}
}

func (reg LWWRegister) ID() string {
return reg.key
return reg.key.ToString()
}

// RETURN DELTA
Expand Down Expand Up @@ -150,19 +147,19 @@ func (reg LWWRegister) setValue(ctx context.Context, val []byte, priority uint64
// if the current priority is higher ignore put
// else if the current value is lexicographically
// greater than the new then ignore
valueK := reg.valueKey(reg.key)
valueK := reg.key.WithValueFlag()
if priority < curPrio {
return nil
} else if priority == curPrio {
curValue, _ := reg.store.Get(ctx, valueK)
curValue, _ := reg.store.Get(ctx, valueK.ToDS())
if bytes.Compare(curValue, val) >= 0 {
return nil
}
}

// prepend the value byte array with a single byte indicator for the CRDT Type.
buf := append([]byte{byte(core.LWW_REGISTER)}, val...)
err = reg.store.Put(ctx, valueK, buf)
err = reg.store.Put(ctx, valueK.ToDS(), buf)
if err != nil {
return fmt.Errorf("Failed to store new value : %w", err)
}
Expand Down
6 changes: 2 additions & 4 deletions core/crdt/lwwreg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/sourcenetwork/defradb/core"
"github.com/sourcenetwork/defradb/store"

// "github.com/sourcenetwork/defradb/store"
"github.com/ugorji/go/codec"

ipld "github.com/ipfs/go-ipld-format"
Expand All @@ -34,9 +33,8 @@ func newMockStore() core.DSReaderWriter {

func setupLWWRegister() LWWRegister {
store := newMockStore()
ns := ds.NewKey("defra/test")
id := "AAAA-BBBB"
return NewLWWRegister(store, ns, id)
key := core.DataStoreKey{DocKey: "AAAA-BBBB"}
return NewLWWRegister(store, key)
}

func setupLoadedLWWRegster(ctx context.Context) LWWRegister {
Expand Down
42 changes: 24 additions & 18 deletions core/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ import "strings"
// Span is a range of keys from [Start, End)
type Span interface {
// Start returns the starting key of the Span
Start() Key
Start() DataStoreKey
// End returns the ending key of the Span
End() Key
End() DataStoreKey
// Contains returns true of the Span contains the provided Span's range
Contains(Span) bool
// Equal returns true if the provided Span is equal to the current
Expand All @@ -27,24 +27,25 @@ type Span interface {
}

type span struct {
start Key
end Key
Span
start DataStoreKey
end DataStoreKey
}

func NewSpan(start, end Key) Span {
func NewSpan(start, end DataStoreKey) Span {
return span{
start: start,
end: end,
}
}

// Start returns the starting key of the Span
func (s span) Start() Key {
func (s span) Start() DataStoreKey {
return s.start
}

// End returns the ending key of the Span
func (s span) End() Key {
func (s span) End() DataStoreKey {
return s.end
}

Expand Down Expand Up @@ -84,10 +85,10 @@ func (this span) Compare(other Span) SpanComparisonResult {
return Equal
}

thisStart := this.start.String()
thisEnd := this.end.String()
otherStart := other.Start().String()
otherEnd := other.End().String()
thisStart := this.start.ToString()
thisEnd := this.end.ToString()
otherStart := other.Start().ToString()
otherEnd := other.End().ToString()

if thisStart < otherStart {
if thisEnd == otherStart || isAdjacent(this.end, other.Start()) {
Expand Down Expand Up @@ -146,16 +147,21 @@ func (this span) Compare(other Span) SpanComparisonResult {
return After
}

func isAdjacent(this Key, other Key) bool {
return len(this.String()) == len(other.String()) && (this.PrefixEnd().String() == other.String() || this.String() == other.PrefixEnd().String())
func isAdjacent(this DataStoreKey, other DataStoreKey) bool {
return len(this.ToString()) == len(other.ToString()) && (this.PrefixEnd().ToString() == other.ToString() || this.ToString() == other.PrefixEnd().ToString())
}

// Spans is a collection of individual spans
type Spans []Span

// KeyValue is a KV store response containing the resulting ds.Key and byte array value
// KeyValue is a KV store response containing the resulting core.Key and byte array value
type KeyValue struct {
Key Key
Key DataStoreKey
Value []byte
}

type HeadKeyValue struct {
Key HeadStoreKey
Value []byte
}

Expand Down Expand Up @@ -200,7 +206,7 @@ func (spans Spans) MergeAscending() Spans {
uniqueSpanFound = true
i++
case StartBeforeEndAfter:
uniqueSpans = uniqueSpans.removeBefore(i, span.End().String())
uniqueSpans = uniqueSpans.removeBefore(i, span.End().ToString())
uniqueSpans[i] = NewSpan(span.Start(), span.End())
uniqueSpanFound = true
// Exit the unique-span loop, this span has been handled
Expand All @@ -210,7 +216,7 @@ func (spans Spans) MergeAscending() Spans {
// Do nothing, span is contained within an existing unique-span
i = len(uniqueSpans)
case StartEqualEndAfter, StartWithinEndAfter, StartEqualToEndEndAfter:
uniqueSpans = uniqueSpans.removeBefore(i, span.End().String())
uniqueSpans = uniqueSpans.removeBefore(i, span.End().ToString())
uniqueSpans[i] = NewSpan(uniqueSpan.Start(), span.End())
uniqueSpanFound = true
// Exit the unique-span loop, this span has been handled
Expand All @@ -234,7 +240,7 @@ func (spans Spans) MergeAscending() Spans {
func (spans Spans) removeBefore(startIndex int, end string) Spans {
indexOfLastMatchingItem := -1
for i := startIndex; i < len(spans); i++ {
if spans[i].End().String() <= end {
if spans[i].End().ToString() <= end {
indexOfLastMatchingItem = i
}
}
Expand Down
Loading

0 comments on commit 68783bb

Please sign in to comment.