Skip to content

Commit

Permalink
fix: Add object marker to enable return of empty docs (#800)
Browse files Browse the repository at this point in the history
This PR add an object marker to stored documents. This helps the iterator return an empty documents.
  • Loading branch information
fredcarle authored Sep 15, 2022
1 parent 57a9394 commit 39033f1
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 38 deletions.
28 changes: 27 additions & 1 deletion core/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,16 @@ func NewDataStoreKey(key string) DataStoreKey {
}

elements := strings.Split(key, "/")
numberOfElements := len(elements)

if isDataObjectMarker(elements) {
return DataStoreKey{
CollectionId: elements[3],
InstanceType: ValueKey,
DocKey: elements[5],
}
}

numberOfElements := len(elements)
return DataStoreKey{
CollectionId: elements[numberOfElements-4],
InstanceType: InstanceType(elements[numberOfElements-3]),
Expand Down Expand Up @@ -440,3 +448,21 @@ func bytesPrefixEnd(b []byte) []byte {
// maximal byte string (i.e. already \xff...).
return b
}

func isDataObjectMarker(elements []string) bool {
numElements := len(elements)
// lenght is 6 because it has no FieldID
if numElements != 6 {
return false
}
if elements[1] != "db" {
return false
}
if elements[2] != "data" {
return false
}
if elements[4] != "v" {
return false
}
return true
}
28 changes: 27 additions & 1 deletion db/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,11 +476,21 @@ func (c *collection) create(ctx context.Context, txn datastore.Txn, doc *client.
return ErrDocumentAlreadyExists
}

// write object marker
// write primary key object marker
err = txn.Datastore().Put(ctx, key.ToDS(), []byte{base.ObjectMarker})
if err != nil {
return err
}

// write value object marker if we have an empty doc
if len(doc.Values()) == 0 {
valueKey := c.getDatastoreFromDocKey(dockey)
err = txn.Datastore().Put(ctx, valueKey.ToDS(), []byte{base.ObjectMarker})
if err != nil {
return err
}
}

// write data to DB via MerkleClock/CRDT
_, err = c.save(ctx, txn, doc)

Expand Down Expand Up @@ -709,6 +719,14 @@ func (c *collection) deleteWithPrefix(ctx context.Context, txn datastore.Txn, ke
Prefix: key.ToString(),
KeysOnly: true,
}

if key.InstanceType == core.ValueKey {
err := txn.Datastore().Delete(ctx, core.NewDataStoreKey(key.ToString()).ToDS())
if err != nil {
return false, err
}
}

res, err := txn.Datastore().Query(ctx, q)

for e := range res.Next() {
Expand Down Expand Up @@ -885,6 +903,14 @@ func (c *collection) getPrimaryKeyFromDocKey(docKey client.DocKey) core.PrimaryD
}
}

func (c *collection) getDatastoreFromDocKey(docKey client.DocKey) core.DataStoreKey {
return core.DataStoreKey{
CollectionId: fmt.Sprint(c.colID),
DocKey: docKey.String(),
InstanceType: core.ValueKey,
}
}

func (c *collection) tryGetFieldKey(key core.PrimaryDataStoreKey, fieldName string) (core.DataStoreKey, bool) {
fieldId, hasField := c.tryGetSchemaFieldID(fieldName)
if !hasField {
Expand Down
59 changes: 27 additions & 32 deletions db/fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,43 +202,33 @@ func (df *DocumentFetcher) ProcessKV(kv *core.KeyValue) error {
// It returns true if the current doc is completed
func (df *DocumentFetcher) nextKey(ctx context.Context) (spanDone bool, err error) {
// get the next kv from nextKV()
for {
spanDone, df.kv, err = df.nextKV()
// handle any internal errors
if err != nil {
return false, err
}

if df.kv != nil && df.kv.Key.InstanceType != core.ValueKey {
// We can only ready value values, if we escape the collection's value keys
// then we must be done and can stop reading
spanDone = true
}
spanDone, df.kv, err = df.nextKV()
// handle any internal errors
if err != nil {
return false, err
}

df.kvEnd = spanDone
if df.kvEnd {
hasNextSpan, err := df.startNextSpan(ctx)
if err != nil {
return false, err
}
if hasNextSpan {
return true, nil
}
return true, nil
}
if df.kv != nil && df.kv.Key.InstanceType != core.ValueKey {
// We can only ready value values, if we escape the collection's value keys
// then we must be done and can stop reading
spanDone = true
}

// skip object markers
if bytes.Equal(df.kv.Value, []byte{base.ObjectMarker}) {
continue
df.kvEnd = spanDone
if df.kvEnd {
_, err := df.startNextSpan(ctx)
if err != nil {
return false, err
}
return true, nil
}

// check if we've crossed document boundries
if df.doc.Key != nil && df.kv.Key.DocKey != string(df.doc.Key) {
df.isReadingDocument = false
return true, nil
}
return false, nil
// check if we've crossed document boundries
if df.doc.Key != nil && df.kv.Key.DocKey != string(df.doc.Key) {
df.isReadingDocument = false
return true, nil
}
return false, nil
}

// nextKV is a lower-level utility compared to nextKey. The differences are as follows:
Expand Down Expand Up @@ -281,6 +271,11 @@ func (df *DocumentFetcher) processKV(kv *core.KeyValue) error {
df.doc.Key = []byte(kv.Key.DocKey)
}

// we have to skip the object marker
if bytes.Equal(df.kv.Value, []byte{base.ObjectMarker}) {
return nil
}

// extract the FieldID and update the encoded doc properties map
fieldID, err := kv.Key.FieldID()
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions docs/data_format_changes/i610-empty-doc
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Add object marker to enable return of empty docs

The object marker had previously been removed and replaces with the primary key instance type object. This reintroduces the object marker in the value instance type to allow the datastore iterator to find empty documents.
14 changes: 10 additions & 4 deletions tests/integration/query/simple/simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,15 +158,13 @@ func TestQuerySimpleWithSomeDefaultValues(t *testing.T) {
executeTestCase(t, test)
}

// This test documents undesirable behaviour and should be altered
// with https://github.com/sourcenetwork/defradb/issues/610.
// A document with nil fields should be returned.
func TestQuerySimpleWithDefaultValue(t *testing.T) {
test := testUtils.QueryTestCase{
Description: "Simple query with default-value fields",
Query: `query {
users {
Name
Email
Age
HeightM
Verified
Expand All @@ -177,7 +175,15 @@ func TestQuerySimpleWithDefaultValue(t *testing.T) {
`{ }`,
},
},
Results: []map[string]interface{}{},
Results: []map[string]interface{}{
{
"Name": nil,
"Email": nil,
"Age": nil,
"HeightM": nil,
"Verified": nil,
},
},
}

executeTestCase(t, test)
Expand Down

0 comments on commit 39033f1

Please sign in to comment.