From 6c0c5bdfbfd2ada5621b4d3eaea4e383c42e306a Mon Sep 17 00:00:00 2001 From: John-Alan Simmons Date: Tue, 27 Jun 2023 17:41:13 -0400 Subject: [PATCH] refactor: Fetcher filter and field optimization (#1500) ## Relevant issue(s) Resolves #490 Resolves #1582 (indirectly) ## Description This is a reduced version of #491. It takes a very different approach, and tries to keep as much of the existing Fetcher structure as possible. Basically, this will try to eagerly ignore documents that don't pass the given filter at the fetcher level. This means we can apply various optimizations then if the filter was applied at the scanNode level like before. --- client/descriptions.go | 24 +- core/data.go | 6 - db/collection.go | 2 +- db/collection_get.go | 4 +- db/collection_index.go | 16 +- db/collection_update.go | 4 +- db/fetcher/encoded_doc.go | 71 +++- db/fetcher/errors.go | 2 + db/fetcher/fetcher.go | 331 ++++++++++++++---- db/fetcher/mocks/EncodedDocument.go | 46 ++- db/fetcher/mocks/Fetcher.go | 26 +- db/fetcher/mocks/utils.go | 9 +- db/fetcher/versioned.go | 7 +- db/fetcher_test.go | 15 +- db/indexed_docs_test.go | 17 +- go.mod | 1 + go.sum | 2 + net/process.go | 2 +- planner/commit.go | 2 +- planner/datasource.go | 4 +- planner/mapper/errors.go | 2 + planner/mapper/mapper.go | 170 ++++++--- planner/mapper/targetable.go | 8 +- planner/planner.go | 6 +- planner/scan.go | 124 +++++-- planner/sum.go | 4 +- planner/top.go | 1 + planner/type_join.go | 27 +- request/graphql/parser/errors.go | 1 + request/graphql/parser/filter.go | 61 ++++ .../explain/default/delete_test.go | 4 +- .../explain/execute/create_test.go | 5 +- .../explain/execute/delete_test.go | 10 +- .../integration/explain/execute/group_test.go | 5 +- .../integration/explain/execute/scan_test.go | 20 +- .../explain/execute/top_level_test.go | 15 +- .../explain/execute/type_join_test.go | 20 +- .../explain/execute/update_test.go | 10 +- .../explain/execute/with_average_test.go | 10 +- .../explain/execute/with_count_test.go | 5 +- .../explain/execute/with_limit_test.go | 10 +- .../explain/execute/with_order_test.go | 25 +- .../explain/execute/with_sum_test.go | 10 +- .../with_group_related_id_alias_test.go | 3 - .../one_to_one_to_one/with_order_test.go | 123 +++++++ .../query/simple/with_group_test.go | 1 + 46 files changed, 906 insertions(+), 365 deletions(-) create mode 100644 tests/integration/query/one_to_one_to_one/with_order_test.go diff --git a/client/descriptions.go b/client/descriptions.go index 6e4f30b1bb..5d6a37bacc 100644 --- a/client/descriptions.go +++ b/client/descriptions.go @@ -39,18 +39,6 @@ func (col CollectionDescription) IDString() string { return fmt.Sprint(col.ID) } -// GetField returns the field of the given name. -func (col CollectionDescription) GetField(name string) (FieldDescription, bool) { - if !col.Schema.IsEmpty() { - for _, field := range col.Schema.Fields { - if field.Name == name { - return field, true - } - } - } - return FieldDescription{}, false -} - // GetFieldByID searches for a field with the given ID. If such a field is found it // will return it and true, if it is not found it will return false. func (col CollectionDescription) GetFieldByID(id FieldID) (FieldDescription, bool) { @@ -118,6 +106,18 @@ func (sd SchemaDescription) GetFieldKey(fieldName string) uint32 { return uint32(0) } +// GetField returns the field of the given name. +func (sd SchemaDescription) GetField(name string) (FieldDescription, bool) { + if !sd.IsEmpty() { + for _, field := range sd.Fields { + if field.Name == name { + return field, true + } + } + } + return FieldDescription{}, false +} + // FieldKind describes the type of a field. type FieldKind uint8 diff --git a/core/data.go b/core/data.go index aee4cf64ed..a756d41f91 100644 --- a/core/data.go +++ b/core/data.go @@ -156,12 +156,6 @@ func NewSpans(spans ...Span) Spans { } } -// KeyValue is a KV store response containing the resulting core.Key and byte array value. -type KeyValue struct { - Key DataStoreKey - Value []byte -} - // HeadKeyValue is a KV store response containing the resulting core.HeadStoreKey // and byte array value. type HeadKeyValue struct { diff --git a/db/collection.go b/db/collection.go index 2f53ef9bb2..0db353fff1 100644 --- a/db/collection.go +++ b/db/collection.go @@ -868,7 +868,7 @@ func (c *collection) save( return cid.Undef, client.NewErrFieldNotExist(k) } - fieldDescription, valid := c.desc.GetField(k) + fieldDescription, valid := c.desc.Schema.GetField(k) if !valid { return cid.Undef, client.NewErrFieldNotExist(k) } diff --git a/db/collection_get.go b/db/collection_get.go index 836842d094..e26b6c77bb 100644 --- a/db/collection_get.go +++ b/db/collection_get.go @@ -47,14 +47,14 @@ func (c *collection) get( ctx context.Context, txn datastore.Txn, key core.PrimaryDataStoreKey, - fields []*client.FieldDescription, + fields []client.FieldDescription, showDeleted bool, ) (*client.Document, error) { // create a new document fetcher df := c.newFetcher() desc := &c.desc // initialize it with the primary index - err := df.Init(&c.desc, fields, false, showDeleted) + err := df.Init(&c.desc, fields, nil, nil, false, showDeleted) if err != nil { _ = df.Close() return nil, err diff --git a/db/collection_index.go b/db/collection_index.go index 970c812413..4224b80bee 100644 --- a/db/collection_index.go +++ b/db/collection_index.go @@ -128,12 +128,12 @@ func (c *collection) indexNewDoc(ctx context.Context, txn datastore.Txn, doc *cl } // collectIndexedFields returns all fields that are indexed by all collection indexes. -func (c *collection) collectIndexedFields() []*client.FieldDescription { - fieldsMap := make(map[string]*client.FieldDescription) +func (c *collection) collectIndexedFields() []client.FieldDescription { + fieldsMap := make(map[string]client.FieldDescription) for _, index := range c.indexes { for _, field := range index.Description().Fields { for i := range c.desc.Schema.Fields { - colField := &c.desc.Schema.Fields[i] + colField := c.desc.Schema.Fields[i] if field.Name == colField.Name { fieldsMap[field.Name] = colField break @@ -141,7 +141,7 @@ func (c *collection) collectIndexedFields() []*client.FieldDescription { } } } - fields := make([]*client.FieldDescription, 0, len(fieldsMap)) + fields := make([]client.FieldDescription, 0, len(fieldsMap)) for _, field := range fieldsMap { fields = append(fields, field) } @@ -260,11 +260,11 @@ func (c *collection) createIndex( func (c *collection) iterateAllDocs( ctx context.Context, txn datastore.Txn, - fields []*client.FieldDescription, + fields []client.FieldDescription, exec func(doc *client.Document) error, ) error { df := c.newFetcher() - err := df.Init(&c.desc, fields, false, false) + err := df.Init(&c.desc, fields, nil, nil, false, false) if err != nil { _ = df.Close() return err @@ -302,10 +302,10 @@ func (c *collection) indexExistingDocs( txn datastore.Txn, index CollectionIndex, ) error { - fields := make([]*client.FieldDescription, 0, 1) + fields := make([]client.FieldDescription, 0, 1) for _, field := range index.Description().Fields { for i := range c.desc.Schema.Fields { - colField := &c.desc.Schema.Fields[i] + colField := c.desc.Schema.Fields[i] if field.Name == colField.Name { fields = append(fields, colField) break diff --git a/db/collection_update.go b/db/collection_update.go index cb1a4d70f8..09f9d6e3ae 100644 --- a/db/collection_update.go +++ b/db/collection_update.go @@ -305,7 +305,7 @@ func (c *collection) applyMerge( return ErrInvalidMergeValueType } - fd, valid := c.desc.GetField(mfield) + fd, valid := c.desc.Schema.GetField(mfield) if !valid { return client.NewErrFieldNotExist(mfield) } @@ -398,7 +398,7 @@ func (c *collection) isSecondaryIDField(fieldDesc client.FieldDescription) (clie return client.FieldDescription{}, false } - relationFieldDescription, valid := c.Description().GetField( + relationFieldDescription, valid := c.Description().Schema.GetField( strings.TrimSuffix(fieldDesc.Name, request.RelatedObjectID), ) return relationFieldDescription, valid && !relationFieldDescription.IsPrimaryRelation() diff --git a/db/fetcher/encoded_doc.go b/db/fetcher/encoded_doc.go index e12fdd1e71..76a87ec2d2 100644 --- a/db/fetcher/encoded_doc.go +++ b/db/fetcher/encoded_doc.go @@ -13,6 +13,7 @@ package fetcher import ( "fmt" + "github.com/bits-and-blooms/bitset" "github.com/fxamacker/cbor/v2" "github.com/sourcenetwork/immutable" @@ -24,12 +25,12 @@ type EncodedDocument interface { // Key returns the key of the document Key() []byte // Reset re-initializes the EncodedDocument object. - Reset(newKey []byte) + Reset() // Decode returns a properly decoded document object Decode() (*client.Document, error) // DecodeToDoc returns a decoded document as a // map of field/value pairs - DecodeToDoc(*core.DocumentMapping) (core.Doc, error) + DecodeToDoc() (core.Doc, error) } type EPTuple []encProperty @@ -39,6 +40,10 @@ type encProperty struct { Desc client.FieldDescription Raw []byte + // Filter flag to determine if this flag + // is needed for eager filter evaluation + IsFilter bool + // // encoding meta data // encoding base.DataEncoding } @@ -190,8 +195,20 @@ func convertToInt(propertyName string, untypedValue any) (int64, error) { // @todo: Implement Encoded Document type type encodedDocument struct { + mapping *core.DocumentMapping + doc *core.Doc + key []byte - properties map[client.FieldDescription]*encProperty + Properties []*encProperty + + // tracking bitsets + // A value of 1 indicates a required field + // 0 means we we ignore the field + // we update the bitsets as we collect values + // by clearing the bit for the FieldID + filterSet *bitset.BitSet // filter fields + selectSet *bitset.BitSet // select fields + } var _ EncodedDocument = (*encodedDocument)(nil) @@ -201,9 +218,15 @@ func (encdoc *encodedDocument) Key() []byte { } // Reset re-initializes the EncodedDocument object. -func (encdoc *encodedDocument) Reset(newKey []byte) { - encdoc.properties = make(map[client.FieldDescription]*encProperty) - encdoc.key = newKey +func (encdoc *encodedDocument) Reset() { + encdoc.Properties = make([]*encProperty, 0) + encdoc.key = nil + if encdoc.mapping != nil { + doc := encdoc.mapping.NewDoc() + encdoc.doc = &doc + } + encdoc.filterSet = nil + encdoc.selectSet = nil } // Decode returns a properly decoded document object @@ -213,12 +236,12 @@ func (encdoc *encodedDocument) Decode() (*client.Document, error) { return nil, err } doc := client.NewDocWithKey(key) - for fieldDesc, prop := range encdoc.properties { + for _, prop := range encdoc.Properties { ctype, val, err := prop.Decode() if err != nil { return nil, err } - err = doc.SetAs(fieldDesc.Name, val, ctype) + err = doc.SetAs(prop.Desc.Name, val, ctype) if err != nil { return nil, err } @@ -229,15 +252,35 @@ func (encdoc *encodedDocument) Decode() (*client.Document, error) { // DecodeToDoc returns a decoded document as a // map of field/value pairs -func (encdoc *encodedDocument) DecodeToDoc(mapping *core.DocumentMapping) (core.Doc, error) { - doc := mapping.NewDoc() - doc.SetKey(string(encdoc.key)) - for fieldDesc, prop := range encdoc.properties { +func (encdoc *encodedDocument) DecodeToDoc() (core.Doc, error) { + return encdoc.decodeToDoc(false) +} + +func (encdoc *encodedDocument) decodeToDocForFilter() (core.Doc, error) { + return encdoc.decodeToDoc(true) +} + +func (encdoc *encodedDocument) decodeToDoc(filter bool) (core.Doc, error) { + if encdoc.mapping == nil { + return core.Doc{}, ErrMissingMapper + } + if encdoc.doc == nil { + doc := encdoc.mapping.NewDoc() + encdoc.doc = &doc + } + encdoc.doc.SetKey(string(encdoc.key)) + for _, prop := range encdoc.Properties { + if encdoc.doc.Fields[prop.Desc.ID] != nil { // used cached decoded fields + continue + } + if filter && !prop.IsFilter { // only get filter fields if filter=true + continue + } _, val, err := prop.Decode() if err != nil { return core.Doc{}, err } - doc.Fields[fieldDesc.ID] = val + encdoc.doc.Fields[prop.Desc.ID] = val } - return doc, nil + return *encdoc.doc, nil } diff --git a/db/fetcher/errors.go b/db/fetcher/errors.go index 31453e8ad6..84d947c46f 100644 --- a/db/fetcher/errors.go +++ b/db/fetcher/errors.go @@ -25,6 +25,7 @@ const ( errVFetcherFailedToDecodeNode string = "(version fetcher) failed to decode protobuf" errVFetcherFailedToGetDagLink string = "(version fetcher) failed to get node link from DAG" errFailedToGetDagNode string = "failed to get DAG Node" + errMissingMapper string = "missing document mapper" ) var ( @@ -38,6 +39,7 @@ var ( ErrVFetcherFailedToDecodeNode = errors.New(errVFetcherFailedToDecodeNode) ErrVFetcherFailedToGetDagLink = errors.New(errVFetcherFailedToGetDagLink) ErrFailedToGetDagNode = errors.New(errFailedToGetDagNode) + ErrMissingMapper = errors.New(errMissingMapper) ErrSingleSpanOnly = errors.New("spans must contain only a single entry") ) diff --git a/db/fetcher/fetcher.go b/db/fetcher/fetcher.go index 9b052c24a4..4e4dafba32 100644 --- a/db/fetcher/fetcher.go +++ b/db/fetcher/fetcher.go @@ -13,7 +13,9 @@ package fetcher import ( "bytes" "context" + "strings" + "github.com/bits-and-blooms/bitset" dsq "github.com/ipfs/go-datastore/query" "github.com/sourcenetwork/defradb/client" @@ -21,12 +23,21 @@ import ( "github.com/sourcenetwork/defradb/datastore" "github.com/sourcenetwork/defradb/datastore/iterable" "github.com/sourcenetwork/defradb/db/base" + "github.com/sourcenetwork/defradb/planner/mapper" + "github.com/sourcenetwork/defradb/request/graphql/parser" ) // Fetcher is the interface for collecting documents from the underlying data store. // It handles all the key/value scanning, aggregation, and document encoding. type Fetcher interface { - Init(col *client.CollectionDescription, fields []*client.FieldDescription, reverse bool, showDeleted bool) error + Init( + col *client.CollectionDescription, + fields []client.FieldDescription, + filter *mapper.Filter, + docmapper *core.DocumentMapping, + reverse bool, + showDeleted bool, + ) error Start(ctx context.Context, txn datastore.Txn, spans core.Spans) error FetchNext(ctx context.Context) (EncodedDocument, error) FetchNextDecoded(ctx context.Context) (*client.Document, error) @@ -34,6 +45,12 @@ type Fetcher interface { Close() error } +// keyValue is a KV store response containing the resulting core.Key and byte array value. +type keyValue struct { + Key core.DataStoreKey + Value []byte +} + var ( _ Fetcher = (*DocumentFetcher)(nil) ) @@ -48,14 +65,35 @@ type DocumentFetcher struct { order []dsq.Order curSpanIndex int - schemaFields map[uint32]client.FieldDescription - fields []*client.FieldDescription + filter *mapper.Filter + ranFilter bool // did we run the filter + passedFilter bool // did we pass the filter + + filterFields map[uint32]client.FieldDescription + selectFields map[uint32]client.FieldDescription + + // static bitset to which stores the IDs of fields + // needed for filtering. + // + // This is compared against the encdoc.filterSet which + // is a dynamic bitset, that gets updated as fields are + // added to the encdoc, and cleared on reset. + // + // We compare the two bitsets to determine if we've collected + // all the necessary fields to run the filter. + // + // This is *much* more effecient for comparison then most (any?) + // other approach. + // + // When proper seek() is added, this will also be responsible + // for effectiently finding the next field to seek to. + filterSet *bitset.BitSet + + doc *encodedDocument - doc *encodedDocument - decodedDoc *client.Document initialized bool - kv *core.KeyValue + kv *keyValue kvIter iterable.Iterator kvResultsIter dsq.Results kvEnd bool @@ -70,7 +108,9 @@ type DocumentFetcher struct { // Init implements DocumentFetcher. func (df *DocumentFetcher) Init( col *client.CollectionDescription, - fields []*client.FieldDescription, + fields []client.FieldDescription, + filter *mapper.Filter, + docmapper *core.DocumentMapping, reverse bool, showDeleted bool, ) error { @@ -78,7 +118,7 @@ func (df *DocumentFetcher) Init( return client.NewErrUninitializeProperty("DocumentFetcher", "Schema") } - err := df.init(col, fields, reverse) + err := df.init(col, fields, filter, docmapper, reverse) if err != nil { return err } @@ -87,7 +127,7 @@ func (df *DocumentFetcher) Init( if df.deletedDocFetcher == nil { df.deletedDocFetcher = new(DocumentFetcher) } - return df.deletedDocFetcher.init(col, fields, reverse) + return df.deletedDocFetcher.init(col, fields, filter, docmapper, reverse) } return nil @@ -95,15 +135,22 @@ func (df *DocumentFetcher) Init( func (df *DocumentFetcher) init( col *client.CollectionDescription, - fields []*client.FieldDescription, + fields []client.FieldDescription, + filter *mapper.Filter, + docMapper *core.DocumentMapping, reverse bool, ) error { df.col = col - df.fields = fields df.reverse = reverse df.initialized = true + df.filter = filter df.isReadingDocument = false df.doc = new(encodedDocument) + df.doc.mapping = docMapper + + if df.filter != nil && docMapper == nil { + return ErrMissingMapper + } if df.kvResultsIter != nil { if err := df.kvResultsIter.Close(); err != nil { @@ -118,10 +165,34 @@ func (df *DocumentFetcher) init( } df.kvIter = nil - df.schemaFields = make(map[uint32]client.FieldDescription) - for _, field := range col.Schema.Fields { - df.schemaFields[uint32(field.ID)] = field + df.selectFields = make(map[uint32]client.FieldDescription, len(fields)) + // if we haven't been told to get specific fields + // get them all + var targetFields []client.FieldDescription + if len(fields) == 0 { + targetFields = df.col.Schema.Fields + } else { + targetFields = fields + } + + for _, field := range targetFields { + df.selectFields[uint32(field.ID)] = field + } + + if df.filter != nil { + conditions := df.filter.ToMap(df.doc.mapping) + parsedfilterFields, err := parser.ParseFilterFieldsForDescription(conditions, df.col.Schema) + if err != nil { + return err + } + df.filterFields = make(map[uint32]client.FieldDescription, len(parsedfilterFields)) + df.filterSet = bitset.New(uint(len(col.Schema.Fields))) + for _, field := range parsedfilterFields { + df.filterFields[uint32(field.ID)] = field + df.filterSet.Set(uint(field.ID)) + } } + return nil } @@ -218,39 +289,36 @@ func (df *DocumentFetcher) startNextSpan(ctx context.Context) (bool, error) { } df.curSpanIndex = nextSpanIndex - _, err = df.nextKey(ctx) + _, _, err = df.nextKey(ctx, false) return err == nil, err } -func (df *DocumentFetcher) KVEnd() bool { - return df.kvEnd -} - -func (df *DocumentFetcher) KV() *core.KeyValue { - return df.kv -} - -func (df *DocumentFetcher) NextKey(ctx context.Context) (docDone bool, err error) { - return df.nextKey(ctx) -} - -func (df *DocumentFetcher) NextKV() (iterDone bool, kv *core.KeyValue, err error) { - return df.nextKV() -} - -func (df *DocumentFetcher) ProcessKV(kv *core.KeyValue) error { - return df.processKV(kv) -} - // nextKey gets the next kv. It sets both kv and kvEnd internally. -// It returns true if the current doc is completed -func (df *DocumentFetcher) nextKey(ctx context.Context) (spanDone bool, err error) { - // get the next kv from nextKV() - spanDone, df.kv, err = df.nextKV() - // handle any internal errors - if err != nil { - return false, err +// It returns true if the current doc is completed. +// The first call to nextKey CANNOT have seekNext be true (ErrFailedToSeek) +func (df *DocumentFetcher) nextKey(ctx context.Context, seekNext bool) (spanDone bool, docDone bool, err error) { + // safety against seekNext on first call + if seekNext && df.kv == nil { + return false, false, ErrFailedToSeek + } + + if seekNext { + curKey := df.kv.Key + curKey.FieldId = "" // clear field so prefixEnd applies to dockey + seekKey := curKey.PrefixEnd().ToString() + spanDone, df.kv, err = df.seekKV(seekKey) + // handle any internal errors + if err != nil { + return false, false, err + } + } else { + spanDone, df.kv, err = df.nextKV() + // handle any internal errors + if err != nil { + return false, false, err + } } + if df.kv != nil && (df.kv.Key.InstanceType != core.ValueKey && df.kv.Key.InstanceType != core.DeletedKey) { // We can only ready value values, if we escape the collection's value keys // then we must be done and can stop reading @@ -259,50 +327,99 @@ func (df *DocumentFetcher) nextKey(ctx context.Context) (spanDone bool, err erro df.kvEnd = spanDone if df.kvEnd { - _, err := df.startNextSpan(ctx) + moreSpans, err := df.startNextSpan(ctx) if err != nil { - return false, err + return false, false, err } - return true, nil + df.isReadingDocument = false + return !moreSpans, true, nil } // check if we've crossed document boundries - if df.doc.Key() != nil && df.kv.Key.DocKey != string(df.doc.Key()) { + if (df.doc.key != nil && df.kv.Key.DocKey != string(df.doc.key)) || seekNext { df.isReadingDocument = false - return true, nil + return false, true, nil } - return false, nil + return false, false, nil } // nextKV is a lower-level utility compared to nextKey. The differences are as follows: // - It directly interacts with the KVIterator. // - Returns true if the entire iterator/span is exhausted // - Returns a kv pair instead of internally updating -func (df *DocumentFetcher) nextKV() (iterDone bool, kv *core.KeyValue, err error) { +func (df *DocumentFetcher) nextKV() (iterDone bool, kv *keyValue, err error) { + done, dsKey, res, err := df.nextKVRaw() + if done || err != nil { + return done, nil, err + } + + kv = &keyValue{ + Key: dsKey, + Value: res.Value, + } + return false, kv, nil +} + +// seekKV will seek through results/iterator until it reaches +// the target key, or if the target key doesn't exist, the +// next smallest key that is greater than the target. +func (df *DocumentFetcher) seekKV(key string) (bool, *keyValue, error) { + // make sure the current kv is *before* the target key + switch strings.Compare(df.kv.Key.ToString(), key) { + case 0: + // equal, we should just return the kv state + return df.kvEnd, df.kv, nil + case 1: + // greater, error + return false, nil, NewErrFailedToSeek(key, nil) + } + + for { + done, dsKey, res, err := df.nextKVRaw() + if done || err != nil { + return done, nil, err + } + + switch strings.Compare(dsKey.ToString(), key) { + case -1: + // before, so lets seek again + continue + case 0, 1: + // equal or greater (first), return a formatted kv + kv := &keyValue{ + Key: dsKey, + Value: res.Value, // @todo make lazy + } + return false, kv, nil + } + } +} + +// nextKV is a lower-level utility compared to nextKey. The differences are as follows: +// - It directly interacts with the KVIterator. +// - Returns true if the entire iterator/span is exhausted +// - Returns a kv pair instead of internally updating +func (df *DocumentFetcher) nextKVRaw() (bool, core.DataStoreKey, dsq.Result, error) { res, available := df.kvResultsIter.NextSync() if !available { - return true, nil, nil + return true, core.DataStoreKey{}, res, nil } - err = res.Error + err := res.Error if err != nil { - return true, nil, err + return true, core.DataStoreKey{}, res, err } dsKey, err := core.NewDataStoreKey(res.Key) if err != nil { - return true, nil, err + return true, core.DataStoreKey{}, res, err } - kv = &core.KeyValue{ - Key: dsKey, - Value: res.Value, - } - return false, kv, nil + return false, dsKey, res, nil } // processKV continuously processes the key value pairs we've received // and step by step constructs the current encoded document -func (df *DocumentFetcher) processKV(kv *core.KeyValue) error { +func (df *DocumentFetcher) processKV(kv *keyValue) error { // skip MerkleCRDT meta-data priority key-value pair // implement here <-- // instance := kv.Key.Name() @@ -315,7 +432,18 @@ func (df *DocumentFetcher) processKV(kv *core.KeyValue) error { if !df.isReadingDocument { df.isReadingDocument = true - df.doc.Reset([]byte(kv.Key.DocKey)) + df.doc.Reset() + + // re-init doc state + if df.filterSet != nil { + df.doc.filterSet = bitset.New(df.filterSet.Len()) + if df.filterSet.Test(0) { + df.doc.filterSet.Set(0) // mark dockey as set + } + } + df.doc.key = []byte(kv.Key.DocKey) + df.passedFilter = false + df.ranFilter = false } // we have to skip the object marker @@ -328,21 +456,28 @@ func (df *DocumentFetcher) processKV(kv *core.KeyValue) error { if err != nil { return err } - fieldDesc, exists := df.schemaFields[fieldID] + fieldDesc, exists := df.selectFields[fieldID] if !exists { - return NewErrFieldIdNotFound(fieldID) + fieldDesc, exists = df.filterFields[fieldID] + if !exists { + return nil // if we can't find this field in our sets, just ignore it + } } - // @todo: Secondary Index might not have encoded FieldIDs - // @body: Need to generalized the processKV, and overall Fetcher architecture - // to better handle dynamic use cases beyond primary indexes. If a - // secondary index is provided, we need to extract the indexed/implicit fields - // from the KV pair. - df.doc.properties[fieldDesc] = &encProperty{ + ufid := uint(fieldID) + + property := &encProperty{ Desc: fieldDesc, Raw: kv.Value, } - // @todo: Extract Index implicit/stored keys + + if df.filterSet != nil && df.filterSet.Test(ufid) { + df.doc.filterSet.Set(ufid) + property.IsFilter = true + } + + df.doc.Properties = append(df.doc.Properties, property) + return nil } @@ -369,12 +504,58 @@ func (df *DocumentFetcher) FetchNext(ctx context.Context) (EncodedDocument, erro return nil, err } - end, err := df.nextKey(ctx) + if df.filter != nil { + // only run filter if we've collected all the fields + // required for filtering. This is tracked by the bitsets. + if df.filterSet.Equal(df.doc.filterSet) { + filterDoc, err := df.doc.decodeToDocForFilter() + if err != nil { + return nil, err + } + + df.ranFilter = true + df.passedFilter, err = mapper.RunFilter(filterDoc, df.filter) + if err != nil { + return nil, err + } + } + } + + // if we don't pass the filter (ran and pass) + // theres no point in collecting other select fields + // so we seek to the next doc + spansDone, docDone, err := df.nextKey(ctx, !df.passedFilter && df.ranFilter) if err != nil { return nil, err } - if end { - return df.doc, nil + + if docDone { + if df.filter != nil { + // if we passed, return + if df.passedFilter { + return df.doc, nil + } else if !df.ranFilter { // if we didn't run, run it + decodedDoc, err := df.doc.DecodeToDoc() + if err != nil { + return nil, err + } + df.passedFilter, err = mapper.RunFilter(decodedDoc, df.filter) + if err != nil { + return nil, err + } + if df.passedFilter { + return df.doc, nil + } + } + } else { + return df.doc, nil + } + + if !spansDone { + continue + } + + return nil, nil } // // crossed document kv boundary? @@ -397,12 +578,12 @@ func (df *DocumentFetcher) FetchNextDecoded(ctx context.Context) (*client.Docume return nil, nil } - df.decodedDoc, err = encdoc.Decode() + decodedDoc, err := encdoc.Decode() if err != nil { return nil, err } - return df.decodedDoc, nil + return decodedDoc, nil } // FetchNextDoc returns the next document as a core.Doc. @@ -455,7 +636,7 @@ func (df *DocumentFetcher) FetchNextDoc( status = client.Active } - doc, err := encdoc.DecodeToDoc(mapping) + doc, err := encdoc.DecodeToDoc() if err != nil { return nil, core.Doc{}, err } diff --git a/db/fetcher/mocks/EncodedDocument.go b/db/fetcher/mocks/EncodedDocument.go index d6b88fb3c9..6ffb6b4478 100644 --- a/db/fetcher/mocks/EncodedDocument.go +++ b/db/fetcher/mocks/EncodedDocument.go @@ -75,23 +75,23 @@ func (_c *EncodedDocument_Decode_Call) RunAndReturn(run func() (*client.Document return _c } -// DecodeToDoc provides a mock function with given fields: _a0 -func (_m *EncodedDocument) DecodeToDoc(_a0 *core.DocumentMapping) (core.Doc, error) { - ret := _m.Called(_a0) +// DecodeToDoc provides a mock function with given fields: +func (_m *EncodedDocument) DecodeToDoc() (core.Doc, error) { + ret := _m.Called() var r0 core.Doc var r1 error - if rf, ok := ret.Get(0).(func(*core.DocumentMapping) (core.Doc, error)); ok { - return rf(_a0) + if rf, ok := ret.Get(0).(func() (core.Doc, error)); ok { + return rf() } - if rf, ok := ret.Get(0).(func(*core.DocumentMapping) core.Doc); ok { - r0 = rf(_a0) + if rf, ok := ret.Get(0).(func() core.Doc); ok { + r0 = rf() } else { r0 = ret.Get(0).(core.Doc) } - if rf, ok := ret.Get(1).(func(*core.DocumentMapping) error); ok { - r1 = rf(_a0) + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() } else { r1 = ret.Error(1) } @@ -105,14 +105,13 @@ type EncodedDocument_DecodeToDoc_Call struct { } // DecodeToDoc is a helper method to define mock.On call -// - _a0 *core.DocumentMapping -func (_e *EncodedDocument_Expecter) DecodeToDoc(_a0 interface{}) *EncodedDocument_DecodeToDoc_Call { - return &EncodedDocument_DecodeToDoc_Call{Call: _e.mock.On("DecodeToDoc", _a0)} +func (_e *EncodedDocument_Expecter) DecodeToDoc() *EncodedDocument_DecodeToDoc_Call { + return &EncodedDocument_DecodeToDoc_Call{Call: _e.mock.On("DecodeToDoc")} } -func (_c *EncodedDocument_DecodeToDoc_Call) Run(run func(_a0 *core.DocumentMapping)) *EncodedDocument_DecodeToDoc_Call { +func (_c *EncodedDocument_DecodeToDoc_Call) Run(run func()) *EncodedDocument_DecodeToDoc_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(*core.DocumentMapping)) + run() }) return _c } @@ -122,7 +121,7 @@ func (_c *EncodedDocument_DecodeToDoc_Call) Return(_a0 core.Doc, _a1 error) *Enc return _c } -func (_c *EncodedDocument_DecodeToDoc_Call) RunAndReturn(run func(*core.DocumentMapping) (core.Doc, error)) *EncodedDocument_DecodeToDoc_Call { +func (_c *EncodedDocument_DecodeToDoc_Call) RunAndReturn(run func() (core.Doc, error)) *EncodedDocument_DecodeToDoc_Call { _c.Call.Return(run) return _c } @@ -170,9 +169,9 @@ func (_c *EncodedDocument_Key_Call) RunAndReturn(run func() []byte) *EncodedDocu return _c } -// Reset provides a mock function with given fields: newKey -func (_m *EncodedDocument) Reset(newKey []byte) { - _m.Called(newKey) +// Reset provides a mock function with given fields: +func (_m *EncodedDocument) Reset() { + _m.Called() } // EncodedDocument_Reset_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Reset' @@ -181,14 +180,13 @@ type EncodedDocument_Reset_Call struct { } // Reset is a helper method to define mock.On call -// - newKey []byte -func (_e *EncodedDocument_Expecter) Reset(newKey interface{}) *EncodedDocument_Reset_Call { - return &EncodedDocument_Reset_Call{Call: _e.mock.On("Reset", newKey)} +func (_e *EncodedDocument_Expecter) Reset() *EncodedDocument_Reset_Call { + return &EncodedDocument_Reset_Call{Call: _e.mock.On("Reset")} } -func (_c *EncodedDocument_Reset_Call) Run(run func(newKey []byte)) *EncodedDocument_Reset_Call { +func (_c *EncodedDocument_Reset_Call) Run(run func()) *EncodedDocument_Reset_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].([]byte)) + run() }) return _c } @@ -198,7 +196,7 @@ func (_c *EncodedDocument_Reset_Call) Return() *EncodedDocument_Reset_Call { return _c } -func (_c *EncodedDocument_Reset_Call) RunAndReturn(run func([]byte)) *EncodedDocument_Reset_Call { +func (_c *EncodedDocument_Reset_Call) RunAndReturn(run func()) *EncodedDocument_Reset_Call { _c.Call.Return(run) return _c } diff --git a/db/fetcher/mocks/Fetcher.go b/db/fetcher/mocks/Fetcher.go index 3fa8c60fb7..6c1112f019 100644 --- a/db/fetcher/mocks/Fetcher.go +++ b/db/fetcher/mocks/Fetcher.go @@ -13,6 +13,8 @@ import ( fetcher "github.com/sourcenetwork/defradb/db/fetcher" + mapper "github.com/sourcenetwork/defradb/planner/mapper" + mock "github.com/stretchr/testify/mock" ) @@ -240,13 +242,13 @@ func (_c *Fetcher_FetchNextDoc_Call) RunAndReturn(run func(context.Context, *cor return _c } -// Init provides a mock function with given fields: col, fields, reverse, showDeleted -func (_m *Fetcher) Init(col *client.CollectionDescription, fields []*client.FieldDescription, reverse bool, showDeleted bool) error { - ret := _m.Called(col, fields, reverse, showDeleted) +// Init provides a mock function with given fields: col, fields, filter, docmapper, reverse, showDeleted +func (_m *Fetcher) Init(col *client.CollectionDescription, fields []client.FieldDescription, filter *mapper.Filter, docmapper *core.DocumentMapping, reverse bool, showDeleted bool) error { + ret := _m.Called(col, fields, filter, docmapper, reverse, showDeleted) var r0 error - if rf, ok := ret.Get(0).(func(*client.CollectionDescription, []*client.FieldDescription, bool, bool) error); ok { - r0 = rf(col, fields, reverse, showDeleted) + if rf, ok := ret.Get(0).(func(*client.CollectionDescription, []client.FieldDescription, *mapper.Filter, *core.DocumentMapping, bool, bool) error); ok { + r0 = rf(col, fields, filter, docmapper, reverse, showDeleted) } else { r0 = ret.Error(0) } @@ -261,16 +263,18 @@ type Fetcher_Init_Call struct { // Init is a helper method to define mock.On call // - col *client.CollectionDescription -// - fields []*client.FieldDescription +// - fields []client.FieldDescription +// - filter *mapper.Filter +// - docmapper *core.DocumentMapping // - reverse bool // - showDeleted bool -func (_e *Fetcher_Expecter) Init(col interface{}, fields interface{}, reverse interface{}, showDeleted interface{}) *Fetcher_Init_Call { - return &Fetcher_Init_Call{Call: _e.mock.On("Init", col, fields, reverse, showDeleted)} +func (_e *Fetcher_Expecter) Init(col interface{}, fields interface{}, filter interface{}, docmapper interface{}, reverse interface{}, showDeleted interface{}) *Fetcher_Init_Call { + return &Fetcher_Init_Call{Call: _e.mock.On("Init", col, fields, filter, docmapper, reverse, showDeleted)} } -func (_c *Fetcher_Init_Call) Run(run func(col *client.CollectionDescription, fields []*client.FieldDescription, reverse bool, showDeleted bool)) *Fetcher_Init_Call { +func (_c *Fetcher_Init_Call) Run(run func(col *client.CollectionDescription, fields []client.FieldDescription, filter *mapper.Filter, docmapper *core.DocumentMapping, reverse bool, showDeleted bool)) *Fetcher_Init_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(*client.CollectionDescription), args[1].([]*client.FieldDescription), args[2].(bool), args[3].(bool)) + run(args[0].(*client.CollectionDescription), args[1].([]client.FieldDescription), args[2].(*mapper.Filter), args[3].(*core.DocumentMapping), args[4].(bool), args[5].(bool)) }) return _c } @@ -280,7 +284,7 @@ func (_c *Fetcher_Init_Call) Return(_a0 error) *Fetcher_Init_Call { return _c } -func (_c *Fetcher_Init_Call) RunAndReturn(run func(*client.CollectionDescription, []*client.FieldDescription, bool, bool) error) *Fetcher_Init_Call { +func (_c *Fetcher_Init_Call) RunAndReturn(run func(*client.CollectionDescription, []client.FieldDescription, *mapper.Filter, *core.DocumentMapping, bool, bool) error) *Fetcher_Init_Call { _c.Call.Return(run) return _c } diff --git a/db/fetcher/mocks/utils.go b/db/fetcher/mocks/utils.go index 85412e9170..0aa5fb4d57 100644 --- a/db/fetcher/mocks/utils.go +++ b/db/fetcher/mocks/utils.go @@ -21,7 +21,14 @@ import ( func NewStubbedFetcher(t *testing.T) *Fetcher { f := NewFetcher(t) - f.EXPECT().Init(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Maybe().Return(nil) + f.EXPECT().Init( + mock.Anything, + mock.Anything, + mock.Anything, + mock.Anything, + mock.Anything, + mock.Anything, + ).Maybe().Return(nil) f.EXPECT().Start(mock.Anything, mock.Anything, mock.Anything).Maybe().Return(nil) f.EXPECT().FetchNext(mock.Anything).Maybe().Return(nil, nil) f.EXPECT().FetchNextDoc(mock.Anything, mock.Anything).Maybe(). diff --git a/db/fetcher/versioned.go b/db/fetcher/versioned.go index 8fd8e4245c..e131e9ec34 100644 --- a/db/fetcher/versioned.go +++ b/db/fetcher/versioned.go @@ -27,6 +27,7 @@ import ( "github.com/sourcenetwork/defradb/errors" "github.com/sourcenetwork/defradb/events" "github.com/sourcenetwork/defradb/merkle/crdt" + "github.com/sourcenetwork/defradb/planner/mapper" ) var ( @@ -99,7 +100,9 @@ type VersionedFetcher struct { // Init initializes the VersionedFetcher. func (vf *VersionedFetcher) Init( col *client.CollectionDescription, - fields []*client.FieldDescription, + fields []client.FieldDescription, + filter *mapper.Filter, + docmapper *core.DocumentMapping, reverse bool, showDeleted bool, ) error { @@ -109,7 +112,7 @@ func (vf *VersionedFetcher) Init( // run the DF init, VersionedFetchers only supports the Primary (0) index vf.DocumentFetcher = new(DocumentFetcher) - return vf.DocumentFetcher.Init(col, fields, reverse, showDeleted) + return vf.DocumentFetcher.Init(col, fields, filter, docmapper, reverse, showDeleted) } // Start serializes the correct state according to the Key and CID. diff --git a/db/fetcher_test.go b/db/fetcher_test.go index af6613373f..a153510d5b 100644 --- a/db/fetcher_test.go +++ b/db/fetcher_test.go @@ -15,6 +15,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/sourcenetwork/defradb/client" "github.com/sourcenetwork/defradb/core" @@ -53,7 +54,7 @@ func newTestCollectionDescription() client.CollectionDescription { func newTestFetcher() (*fetcher.DocumentFetcher, error) { df := new(fetcher.DocumentFetcher) desc := newTestCollectionDescription() - err := df.Init(&desc, nil, false, false) + err := df.Init(&desc, desc.Schema.Fields, nil, nil, false, false) if err != nil { return nil, err } @@ -133,7 +134,7 @@ func TestFetcherGetAllPrimaryIndexEncodedDocSingle(t *testing.T) { df := new(fetcher.DocumentFetcher) desc := col.Description() - err = df.Init(&desc, nil, false, false) + err = df.Init(&desc, desc.Schema.Fields, nil, nil, false, false) assert.NoError(t, err) err = df.Start(ctx, txn, core.Spans{}) @@ -178,7 +179,7 @@ func TestFetcherGetAllPrimaryIndexEncodedDocMultiple(t *testing.T) { df := new(fetcher.DocumentFetcher) desc := col.Description() - err = df.Init(&desc, nil, false, false) + err = df.Init(&desc, desc.Schema.Fields, nil, nil, false, false) assert.NoError(t, err) err = df.Start(ctx, txn, core.Spans{}) @@ -210,7 +211,7 @@ func TestFetcherGetAllPrimaryIndexDecodedSingle(t *testing.T) { df := new(fetcher.DocumentFetcher) desc := col.Description() - err = df.Init(&desc, nil, false, false) + err = df.Init(&desc, desc.Schema.Fields, nil, nil, false, false) assert.NoError(t, err) txn, err := db.NewTxn(ctx, true) @@ -224,7 +225,7 @@ func TestFetcherGetAllPrimaryIndexDecodedSingle(t *testing.T) { ddoc, err := df.FetchNextDecoded(ctx) assert.NoError(t, err) - assert.NotNil(t, ddoc) + require.NotNil(t, ddoc) // value check name, err := ddoc.Get("Name") @@ -262,7 +263,7 @@ func TestFetcherGetAllPrimaryIndexDecodedMultiple(t *testing.T) { df := new(fetcher.DocumentFetcher) desc := col.Description() - err = df.Init(&desc, nil, false, false) + err = df.Init(&desc, desc.Schema.Fields, nil, nil, false, false) assert.NoError(t, err) txn, err := db.NewTxn(ctx, true) @@ -319,7 +320,7 @@ func TestFetcherGetOnePrimaryIndexDecoded(t *testing.T) { df := new(fetcher.DocumentFetcher) desc := col.Description() - err = df.Init(&desc, nil, false, false) + err = df.Init(&desc, desc.Schema.Fields, nil, nil, false, false) assert.NoError(t, err) // create a span for our document we wish to find diff --git a/db/indexed_docs_test.go b/db/indexed_docs_test.go index 286633a7a5..4023c4dc7d 100644 --- a/db/indexed_docs_test.go +++ b/db/indexed_docs_test.go @@ -29,6 +29,7 @@ import ( "github.com/sourcenetwork/defradb/datastore/mocks" "github.com/sourcenetwork/defradb/db/fetcher" fetcherMocks "github.com/sourcenetwork/defradb/db/fetcher/mocks" + "github.com/sourcenetwork/defradb/planner/mapper" ) type userDoc struct { @@ -545,8 +546,8 @@ func TestNonUniqueCreate_IfUponIndexingExistingDocsFetcherFails_ReturnError(t *t Name: "Fails to init", PrepareFetcher: func() fetcher.Fetcher { f := fetcherMocks.NewStubbedFetcher(t) - f.EXPECT().Init(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Unset() - f.EXPECT().Init(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(testError) + f.EXPECT().Init(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Unset() + f.EXPECT().Init(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(testError) f.EXPECT().Close().Unset() f.EXPECT().Close().Return(nil) return f @@ -831,8 +832,8 @@ func TestNonUniqueUpdate_IfFetcherFails_ReturnError(t *testing.T) { Name: "Fails to init", PrepareFetcher: func() fetcher.Fetcher { f := fetcherMocks.NewStubbedFetcher(t) - f.EXPECT().Init(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Unset() - f.EXPECT().Init(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(testError) + f.EXPECT().Init(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Unset() + f.EXPECT().Init(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(testError) f.EXPECT().Close().Unset() f.EXPECT().Close().Return(nil) return f @@ -926,11 +927,13 @@ func TestNonUniqueUpdate_ShouldPassToFetcherOnlyRelevantFields(t *testing.T) { f.users.fetcherFactory = func() fetcher.Fetcher { f := fetcherMocks.NewStubbedFetcher(t) - f.EXPECT().Init(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Unset() - f.EXPECT().Init(mock.Anything, mock.Anything, mock.Anything, mock.Anything). + f.EXPECT().Init(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Unset() + f.EXPECT().Init(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). RunAndReturn(func( col *client.CollectionDescription, - fields []*client.FieldDescription, + fields []client.FieldDescription, + filter *mapper.Filter, + mapping *core.DocumentMapping, reverse, showDeleted bool, ) error { require.Equal(t, 2, len(fields)) diff --git a/go.mod b/go.mod index 3a668b01f7..91bdf2f2f3 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/sourcenetwork/defradb go 1.19 require ( + github.com/bits-and-blooms/bitset v1.7.0 github.com/bxcodec/faker v2.0.1+incompatible github.com/dgraph-io/badger/v3 v3.2103.5 github.com/evanphx/json-patch/v5 v5.6.0 diff --git a/go.sum b/go.sum index 0f0588f5d6..3612ff63cd 100644 --- a/go.sum +++ b/go.sum @@ -84,6 +84,8 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= +github.com/bits-and-blooms/bitset v1.7.0 h1:YjAGVd3XmtK9ktAbX8Zg2g2PwLIMjGREZJHlV4j7NEo= +github.com/bits-and-blooms/bitset v1.7.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA= github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g= github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8= github.com/btcsuite/btcd v0.0.0-20190523000118-16327141da8c/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI= diff --git a/net/process.go b/net/process.go index 6b14888110..281c7d3145 100644 --- a/net/process.go +++ b/net/process.go @@ -103,7 +103,7 @@ func initCRDTForType( core.COMPOSITE_NAMESPACE, ) } else { - fd, ok := description.GetField(field) + fd, ok := description.Schema.GetField(field) if !ok { return nil, errors.New(fmt.Sprintf("Couldn't find field %s for doc %s", field, docKey)) } diff --git a/planner/commit.go b/planner/commit.go index b3ad59bec9..e6216e2b43 100644 --- a/planner/commit.go +++ b/planner/commit.go @@ -333,7 +333,7 @@ func (n *dagScanNode) dagBlockToNodeDoc(block blocks.Block) (core.Doc, []*ipld.L return core.Doc{}, nil, err } - field, ok := c.Description().GetField(fieldName.(string)) + field, ok := c.Description().Schema.GetField(fieldName.(string)) if !ok { return core.Doc{}, nil, client.NewErrFieldNotExist(fieldName.(string)) } diff --git a/planner/datasource.go b/planner/datasource.go index 2dea8290c5..afcfbab3ce 100644 --- a/planner/datasource.go +++ b/planner/datasource.go @@ -35,15 +35,13 @@ func (p *Planner) getSource(parsed *mapper.Select) (planSource, error) { return p.getCollectionScanPlan(parsed) } -// @todo: Add field selection func (p *Planner) getCollectionScanPlan(parsed *mapper.Select) (planSource, error) { colDesc, err := p.getCollectionDesc(parsed.CollectionName) if err != nil { return planSource{}, err } - scan := p.Scan(parsed) - err = scan.initCollection(colDesc) + scan, err := p.Scan(parsed) if err != nil { return planSource{}, err } diff --git a/planner/mapper/errors.go b/planner/mapper/errors.go index 9d6db3dab0..552021ca94 100644 --- a/planner/mapper/errors.go +++ b/planner/mapper/errors.go @@ -20,6 +20,8 @@ var ( ErrUnableToIdAggregateChild = errors.New("unable to identify aggregate child") ErrAggregateTargetMissing = errors.New("aggregate must be provided with a property to aggregate") ErrFailedToFindHostField = errors.New("failed to find host field") + ErrInvalidFieldIndex = errors.New("given field doesn't have any indexes") + ErrMissingSelect = errors.New("missing target select field") ) func NewErrInvalidFieldToGroupBy(field string) error { diff --git a/planner/mapper/mapper.go b/planner/mapper/mapper.go index 799d5c5c9b..c1e12876fe 100644 --- a/planner/mapper/mapper.go +++ b/planner/mapper/mapper.go @@ -25,6 +25,10 @@ import ( "github.com/sourcenetwork/defradb/datastore" ) +var ( + FilterEqOp = &Operator{Operation: "_eq"} +) + // ToSelect converts the given [parser.Select] into a [Select]. // // In the process of doing so it will construct the document map required to access the data @@ -69,8 +73,9 @@ func toSelect( fields = append(fields, filterDependencies...) // Resolve order dependencies that may have been missed due to not being rendered. - if err := resolveOrderDependencies( - descriptionsRepo, collectionName, selectRequest.OrderBy, mapping, &fields); err != nil { + err = resolveOrderDependencies( + descriptionsRepo, collectionName, selectRequest.OrderBy, mapping, &fields) + if err != nil { return nil, err } @@ -83,6 +88,7 @@ func toSelect( desc, descriptionsRepo, ) + if err != nil { return nil, err } @@ -92,15 +98,10 @@ func toSelect( groupByFields := selectRequest.GroupBy.Value().Fields // Remap all alias field names to use their internal field name mappings. for index, groupByField := range groupByFields { - if _, fieldHasMapping := mapping.IndexesByName[groupByField]; fieldHasMapping { - // Not an alias as is already mapped. - continue - } else if _, isAlias := mapping.IndexesByName[groupByField+request.RelatedObjectID]; isAlias { - // Remap the alias to it's actual internal name. + fieldDesc, ok := desc.Schema.GetField(groupByField) + if ok && fieldDesc.IsObject() && !fieldDesc.IsObjectArray() { groupByFields[index] = groupByField + request.RelatedObjectID - } else { - // Field is not mapped nor is an alias, then is invalid field to group on. This can be - // incase of when an alias might have been used on groupBy relation from the single side. + } else if ok && fieldDesc.IsObjectArray() { return nil, NewErrInvalidFieldToGroupBy(groupByField) } } @@ -140,38 +141,103 @@ func resolveOrderDependencies( return nil } + currentExistingFields := existingFields // If there is orderby, and any one of the condition fields that are join fields and have not been // requested, we need to map them here. +outer: for _, condition := range source.Value().Conditions { - if len(condition.Fields) <= 1 { - continue - } - - joinField := condition.Fields[0] + fields := condition.Fields[:] // copy slice + for { + numFields := len(fields) + // <2 fields: Direct field on the root type: {age: DESC} + // 2 fields: Single depth related type: {author: {age: DESC}} + // >2 fields: Multi depth related type: {author: {friends: {age: DESC}}} + if numFields == 2 { + joinField := fields[0] + + // ensure the child select is resolved for this order join + innerSelect, err := resolveChildOrder(descriptionsRepo, descName, joinField, mapping, currentExistingFields) + if err != nil { + return err + } - // Check if the join field is already mapped, if not then map it. - if isOrderJoinFieldMapped := len(mapping.IndexesByName[joinField]) != 0; !isOrderJoinFieldMapped { - index := mapping.GetNextIndex() - mapping.Add(index, joinField) + // make sure the actual target field inside the join field + // is included in the select + targetFieldName := fields[1] + targetField := &Field{ + Index: innerSelect.FirstIndexOfName(targetFieldName), + Name: targetFieldName, + } + innerSelect.Fields = append(innerSelect.Fields, targetField) + continue outer + } else if numFields > 2 { + joinField := fields[0] - // Resolve the inner child fields and get it's mapping. - dummyJoinFieldSelect := request.Select{ - Field: request.Field{ - Name: joinField, - }, - } - innerSelect, err := toSelect(descriptionsRepo, index, &dummyJoinFieldSelect, descName) - if err != nil { - return err + // ensure the child select is resolved for this order join + innerSelect, err := resolveChildOrder(descriptionsRepo, descName, joinField, mapping, existingFields) + if err != nil { + return err + } + mapping = innerSelect.DocumentMapping + currentExistingFields = &innerSelect.Fields + fields = fields[1:] // chop off the front item, and loop again on inner + } else { // <= 1 + targetFieldName := fields[0] + *existingFields = append(*existingFields, &Field{ + Index: mapping.FirstIndexOfName(targetFieldName), + Name: targetFieldName, + }) + // nothing todo, continue the outer for loop + continue outer } - *existingFields = append(*existingFields, innerSelect) - mapping.SetChildAt(index, innerSelect.DocumentMapping) } } return nil } +// given a type join field, ensure its mapping exists +// and add a coorsponding select field(s) +func resolveChildOrder( + descriptionsRepo *DescriptionsRepo, + descName string, + orderChildField string, + mapping *core.DocumentMapping, + existingFields *[]Requestable, +) (*Select, error) { + childFieldIndexes := mapping.IndexesByName[orderChildField] + // Check if the join field is already mapped, if not then map it. + if len(childFieldIndexes) == 0 { + index := mapping.GetNextIndex() + mapping.Add(index, orderChildField) + + // Resolve the inner child fields and get it's mapping. + dummyJoinFieldSelect := request.Select{ + Field: request.Field{ + Name: orderChildField, + }, + } + innerSelect, err := toSelect(descriptionsRepo, index, &dummyJoinFieldSelect, descName) + if err != nil { + return nil, err + } + *existingFields = append(*existingFields, innerSelect) + mapping.SetChildAt(index, innerSelect.DocumentMapping) + return innerSelect, nil + } else { + for _, field := range *existingFields { + fieldSelect, ok := field.(*Select) + if !ok { + continue + } + if fieldSelect.Field.Name == orderChildField { + return fieldSelect, nil + } + } + } + return nil, ErrMissingSelect +} + // resolveAggregates figures out which fields the given aggregates are targeting // and converts the aggregateRequest into an Aggregate, appending it onto the given // fields slice. @@ -189,7 +255,6 @@ func resolveAggregates( ) ([]Requestable, error) { fields := inputFields dependenciesByParentId := map[int][]int{} - for _, aggregate := range aggregates { aggregateTargets := make([]AggregateTarget, len(aggregate.targets)) @@ -205,7 +270,7 @@ func resolveAggregates( var hasHost bool var convertedFilter *Filter if childIsMapped { - fieldDesc, isField := desc.GetField(target.hostExternalName) + fieldDesc, isField := desc.Schema.GetField(target.hostExternalName) if isField && !fieldDesc.IsObject() { var order *OrderBy if target.order.HasValue() && len(target.order.Value().Conditions) > 0 { @@ -228,14 +293,14 @@ func resolveAggregates( Index: int(fieldDesc.ID), Name: target.hostExternalName, }, - Filter: ToFilter(target.filter, mapping), + Filter: ToFilter(target.filter.Value(), mapping), Limit: target.limit, OrderBy: order, } } else { childObjectIndex := mapping.FirstIndexOfName(target.hostExternalName) childMapping := mapping.ChildMappings[childObjectIndex] - convertedFilter = ToFilter(target.filter, childMapping) + convertedFilter = ToFilter(target.filter.Value(), childMapping) host, hasHost = tryGetTarget( target.hostExternalName, convertedFilter, @@ -261,7 +326,6 @@ func resolveAggregates( if err != nil { return nil, err } - mapAggregateNestedTargets(target, hostSelectRequest, selectRequest.Root) childMapping, childDesc, err := getTopLevelInfo(descriptionsRepo, hostSelectRequest, childCollectionName) @@ -274,13 +338,19 @@ func resolveAggregates( return nil, err } + err = resolveOrderDependencies( + descriptionsRepo, childCollectionName, target.order, childMapping, &childFields) + if err != nil { + return nil, err + } + childMapping = childMapping.CloneWithoutRender() mapping.SetChildAt(index, childMapping) if !childIsMapped { // If the child was not mapped, the filter will not have been converted yet // so we must do that now. - convertedFilter = ToFilter(target.filter, mapping.ChildMappings[index]) + convertedFilter = ToFilter(target.filter.Value(), mapping.ChildMappings[index]) } dummyJoin := &Select{ @@ -331,6 +401,12 @@ func resolveAggregates( return nil, ErrUnableToIdAggregateChild } + // ensure target aggregate field is included in the type join + hostSelect.Fields = append(hostSelect.Fields, &Field{ + Index: hostSelect.DocumentMapping.FirstIndexOfName(target.childExternalName), + Name: target.childExternalName, + }) + childTarget = OptionalChildTarget{ // If there are multiple children of the same name there is no way // for us (or the consumer) to identify which one they are hoping for @@ -617,7 +693,7 @@ func getCollectionName( return "", err } - hostFieldDesc, parentHasField := parentDescription.GetField(selectRequest.Name) + hostFieldDesc, parentHasField := parentDescription.Schema.GetField(selectRequest.Name) if parentHasField && hostFieldDesc.RelationType != 0 { // If this field exists on the parent, and it is a child object // then this collection name is the collection name of the child. @@ -766,7 +842,13 @@ func resolveInnerFilterDependencies( if keyIndex >= len(mapping.ChildMappings) { // If the key index is outside the bounds of the child mapping array, then - // this is not a relation/join and we can continue (no child props to process) + // this is not a relation/join and we can add it to the fields and + // continue (no child props to process) + newFields = append(existingFields, &Field{ + Index: keyIndex, + Name: key, + }) + continue } @@ -883,7 +965,7 @@ func toTargetable(index int, selectRequest *request.Select, docMap *core.Documen return Targetable{ Field: toField(index, selectRequest), DocKeys: selectRequest.DocKeys, - Filter: ToFilter(selectRequest.Filter, docMap), + Filter: ToFilter(selectRequest.Filter.Value(), docMap), Limit: toLimit(selectRequest.Limit, selectRequest.Offset), GroupBy: toGroupBy(selectRequest.GroupBy, docMap), OrderBy: toOrderBy(selectRequest.OrderBy, docMap), @@ -901,20 +983,20 @@ func toField(index int, selectRequest *request.Select) Field { // ToFilter converts the given `source` request filter to a Filter using the given mapping. // // Any requestables identified by name will be converted to being identified by index instead. -func ToFilter(source immutable.Option[request.Filter], mapping *core.DocumentMapping) *Filter { - if !source.HasValue() { +func ToFilter(source request.Filter, mapping *core.DocumentMapping) *Filter { + if len(source.Conditions) == 0 { return nil } - conditions := make(map[connor.FilterKey]any, len(source.Value().Conditions)) + conditions := make(map[connor.FilterKey]any, len(source.Conditions)) - for sourceKey, sourceClause := range source.Value().Conditions { + for sourceKey, sourceClause := range source.Conditions { key, clause := toFilterMap(sourceKey, sourceClause, mapping) conditions[key] = clause } return &Filter{ Conditions: conditions, - ExternalConditions: source.Value().Conditions, + ExternalConditions: source.Conditions, } } diff --git a/planner/mapper/targetable.go b/planner/mapper/targetable.go index 36047b9a47..e18d1f7321 100644 --- a/planner/mapper/targetable.go +++ b/planner/mapper/targetable.go @@ -98,13 +98,13 @@ func filterObjectToMap(mapping *core.DocumentMapping, obj map[connor.FilterKey]a for k, v := range obj { switch keyType := k.(type) { case *PropertyIndex: - subObj := v.(map[connor.FilterKey]any) outkey, _ := mapping.TryToFindNameFromIndex(keyType.Index) - if childMapping, ok := tryGetChildMapping(mapping, keyType.Index); !ok { - outmap[outkey] = filterObjectToMap(mapping, subObj) - } else { + childMapping, ok := tryGetChildMapping(mapping, keyType.Index) + if ok { outmap[outkey] = filterObjectToMap(childMapping, subObj) + } else { + outmap[outkey] = filterObjectToMap(mapping, subObj) } case *Operator: diff --git a/planner/planner.go b/planner/planner.go index fb6d325123..fc2b344cda 100644 --- a/planner/planner.go +++ b/planner/planner.go @@ -525,7 +525,11 @@ func (p *Planner) RunSubscriptionRequest( return nil, err } - return p.executeRequest(ctx, planNode) + data, err := p.executeRequest(ctx, planNode) + if err != nil { + return nil, err + } + return data, nil } // MakePlan makes a plan from the parsed request. diff --git a/planner/scan.go b/planner/scan.go index 5fe4b3c047..36bc426e22 100644 --- a/planner/scan.go +++ b/planner/scan.go @@ -17,6 +17,7 @@ import ( "github.com/sourcenetwork/defradb/db/base" "github.com/sourcenetwork/defradb/db/fetcher" "github.com/sourcenetwork/defradb/planner/mapper" + "github.com/sourcenetwork/defradb/request/graphql/parser" ) type scanExecInfo struct { @@ -25,9 +26,6 @@ type scanExecInfo struct { // Total number of times attempted to fetch documents. docFetches uint64 - - // Total number of documents that matched / passed the filter. - filterMatches uint64 } // scans an index for records @@ -38,7 +36,8 @@ type scanNode struct { p *Planner desc client.CollectionDescription - fields []*client.FieldDescription + fields []client.FieldDescription + docKey []byte showDeleted bool @@ -46,6 +45,7 @@ type scanNode struct { reverse bool filter *mapper.Filter + slct *mapper.Select scanInitialized bool @@ -60,7 +60,7 @@ func (n *scanNode) Kind() string { func (n *scanNode) Init() error { // init the fetcher - if err := n.fetcher.Init(&n.desc, n.fields, n.reverse, n.showDeleted); err != nil { + if err := n.fetcher.Init(&n.desc, n.fields, n.filter, n.slct.DocumentMapping, n.reverse, n.showDeleted); err != nil { return err } return n.initScan() @@ -68,9 +68,63 @@ func (n *scanNode) Init() error { func (n *scanNode) initCollection(desc client.CollectionDescription) error { n.desc = desc + return n.initFields(n.slct.Fields) +} + +func (n *scanNode) initFields(fields []mapper.Requestable) error { + for _, r := range fields { + // add all the possible base level fields the fetcher is responsible + // for, including those that are needed by higher level aggregates + // or grouping alls, which them selves might have further dependants + switch requestable := r.(type) { + // field is simple as its just a base level field + case *mapper.Field: + n.tryAddField(requestable.GetName()) + // select might have its own select fields and filters fields + case *mapper.Select: + n.tryAddField(requestable.Field.Name + "_id") // foreign key for type joins + err := n.initFields(requestable.Fields) + if err != nil { + return err + } + // aggregate might have its own target fields and filter fields + case *mapper.Aggregate: + for _, target := range requestable.AggregateTargets { + if target.Filter != nil { + fieldDescs, err := parser.ParseFilterFieldsForDescription( + target.Filter.ExternalConditions, + n.desc.Schema, + ) + if err != nil { + return err + } + for _, fd := range fieldDescs { + n.tryAddField(fd.Name) + } + } + if target.ChildTarget.HasValue { + n.tryAddField(target.ChildTarget.Name) + } else { + n.tryAddField(target.Field.Name) + } + } + } + } return nil } +func (n *scanNode) tryAddField(fieldName string) bool { + fd, ok := n.desc.Schema.GetField(fieldName) + if !ok { + // skip fields that are not part of the + // schema description. The scanner (and fetcher) + // is only responsible for basic fields + return false + } + n.fields = append(n.fields, fd) + return true +} + // Start starts the internal logic of the scanner // like the DocumentFetcher, and more. func (n *scanNode) Start() error { @@ -102,32 +156,24 @@ func (n *scanNode) Next() (bool, error) { return false, nil } - // keep scanning until we find a doc that passes the filter - for { - var err error - _, n.currentValue, err = n.fetcher.FetchNextDoc(n.p.ctx, n.documentMapping) - if err != nil { - return false, err - } - n.execInfo.docFetches++ + var err error + n.docKey, n.currentValue, err = n.fetcher.FetchNextDoc(n.p.ctx, n.documentMapping) + if err != nil { + return false, err + } + n.execInfo.docFetches++ - if len(n.currentValue.Fields) == 0 { - return false, nil - } - n.documentMapping.SetFirstOfName( - &n.currentValue, - request.DeletedFieldName, - n.currentValue.Status.IsDeleted(), - ) - passed, err := mapper.RunFilter(n.currentValue, n.filter) - if err != nil { - return false, err - } - if passed { - n.execInfo.filterMatches++ - return true, nil - } + if len(n.currentValue.Fields) == 0 { + return false, nil } + + n.documentMapping.SetFirstOfName( + &n.currentValue, + request.DeletedFieldName, + n.currentValue.Status.IsDeleted(), + ) + + return true, nil } func (n *scanNode) Spans(spans core.Spans) { @@ -177,9 +223,8 @@ func (n *scanNode) simpleExplain() (map[string]any, error) { func (n *scanNode) excuteExplain() map[string]any { return map[string]any{ - "iterations": n.execInfo.iterations, - "docFetches": n.execInfo.docFetches, - "filterMatches": n.execInfo.filterMatches, + "iterations": n.execInfo.iterations, + "docFetches": n.execInfo.docFetches, } } @@ -201,18 +246,29 @@ func (n *scanNode) Explain(explainType request.ExplainType) (map[string]any, err // Merge implements mergeNode func (n *scanNode) Merge() bool { return true } -func (p *Planner) Scan(parsed *mapper.Select) *scanNode { +func (p *Planner) Scan(parsed *mapper.Select) (*scanNode, error) { var f fetcher.Fetcher if parsed.Cid.HasValue() { f = new(fetcher.VersionedFetcher) } else { f = new(fetcher.DocumentFetcher) } - return &scanNode{ + scan := &scanNode{ p: p, fetcher: f, + slct: parsed, docMapper: docMapper{parsed.DocumentMapping}, } + + colDesc, err := p.getCollectionDesc(parsed.CollectionName) + if err != nil { + return nil, err + } + err = scan.initCollection(colDesc) + if err != nil { + return nil, err + } + return scan, nil } // multiScanNode is a buffered scanNode that has diff --git a/planner/sum.go b/planner/sum.go index c1f8ca84bf..0e1690898e 100644 --- a/planner/sum.go +++ b/planner/sum.go @@ -82,7 +82,7 @@ func (p *Planner) isValueFloat( return false, err } - fieldDescription, fieldDescriptionFound := parentDescription.GetField(source.Name) + fieldDescription, fieldDescriptionFound := parentDescription.Schema.GetField(source.Name) if !fieldDescriptionFound { return false, client.NewErrFieldNotExist(source.Name) } @@ -130,7 +130,7 @@ func (p *Planner) isValueFloat( return false, err } - fieldDescription, fieldDescriptionFound := childCollectionDescription.GetField(source.ChildTarget.Name) + fieldDescription, fieldDescriptionFound := childCollectionDescription.Schema.GetField(source.ChildTarget.Name) if !fieldDescriptionFound { return false, client.NewErrFieldNotExist(source.ChildTarget.Name) } diff --git a/planner/top.go b/planner/top.go index d887d9e9e7..93e530b2fc 100644 --- a/planner/top.go +++ b/planner/top.go @@ -209,6 +209,7 @@ func (p *Planner) Top(m *mapper.Select) (*topLevelNode, error) { } aggregateChildren = append(aggregateChildren, child) aggregateChildIndexes = append(aggregateChildIndexes, field.GetIndex()) + case *mapper.Select: child, err := p.Select(f) if err != nil { diff --git a/planner/type_join.go b/planner/type_join.go index e50a165aa6..5a64082a32 100644 --- a/planner/type_join.go +++ b/planner/type_join.go @@ -79,7 +79,7 @@ func (p *Planner) makeTypeIndexJoin( var err error desc := parent.sourceInfo.collectionDescription - typeFieldDesc, ok := desc.GetField(subType.Name) + typeFieldDesc, ok := desc.Schema.GetField(subType.Name) if !ok { return nil, client.NewErrFieldNotExist(subType.Name) } @@ -286,7 +286,7 @@ func (p *Planner) makeTypeJoinOne( } // get the correct sub field schema type (collection) - subTypeFieldDesc, ok := parent.sourceInfo.collectionDescription.GetField(subType.Name) + subTypeFieldDesc, ok := parent.sourceInfo.collectionDescription.Schema.GetField(subType.Name) if !ok { return nil, client.NewErrFieldNotExist(subType.Name) } @@ -359,13 +359,9 @@ func (n *typeJoinOne) valuesSecondary(doc core.Doc) core.Doc { Index: n.subType.DocumentMap().FirstIndexOfName(n.subTypeFieldName + request.RelatedObjectID), } filter := map[connor.FilterKey]any{ - fkIndex: doc.GetKey(), - } - - // We have to reset the scan node after appending the new key-filter - if err := n.subType.Init(); err != nil { - log.ErrorE(n.p.ctx, "Sub-type initialization error at scan node reset", err) - return doc + fkIndex: map[connor.FilterKey]any{ + mapper.FilterEqOp: doc.GetKey(), + }, } // using the doc._key as a filter @@ -374,6 +370,12 @@ func (n *typeJoinOne) valuesSecondary(doc core.Doc) core.Doc { return core.Doc{} } + // We have to reset the scan node after appending the new key-filter + if err := n.subType.Init(); err != nil { + log.ErrorE(n.p.ctx, "Sub-type initialization error at scan node reset", err) + return doc + } + next, err := n.subType.Next() if !next || err != nil { return doc @@ -482,7 +484,7 @@ func (p *Planner) makeTypeJoinMany( return nil, err } - subTypeFieldDesc, ok := parent.sourceInfo.collectionDescription.GetField(subType.Name) + subTypeFieldDesc, ok := parent.sourceInfo.collectionDescription.Schema.GetField(subType.Name) if !ok { return nil, client.NewErrFieldNotExist(subType.Name) } @@ -549,8 +551,11 @@ func (n *typeJoinMany) Next() (bool, error) { Index: n.subSelect.FirstIndexOfName(n.rootName + request.RelatedObjectID), } filter := map[connor.FilterKey]any{ - fkIndex: n.currentValue.GetKey(), // user_id: "bae-ALICE" | user_id: "bae-CHARLIE" + fkIndex: map[connor.FilterKey]any{ + mapper.FilterEqOp: n.currentValue.GetKey(), + }, } + // using the doc._key as a filter err := appendFilterToScanNode(n.subType, filter) if err != nil { diff --git a/request/graphql/parser/errors.go b/request/graphql/parser/errors.go index 6d014afcce..c629f11c19 100644 --- a/request/graphql/parser/errors.go +++ b/request/graphql/parser/errors.go @@ -23,4 +23,5 @@ var ( ErrInvalidNumberOfExplainArgs = errors.New("invalid number of arguments to an explain request") ErrUnknownExplainType = errors.New("invalid / unknown explain type") ErrUnknownGQLOperation = errors.New("unknown GraphQL operation type") + ErrInvalidFilterConditions = errors.New("invalid filter condition type, expected map") ) diff --git a/request/graphql/parser/filter.go b/request/graphql/parser/filter.go index 7afb626372..46119070b4 100644 --- a/request/graphql/parser/filter.go +++ b/request/graphql/parser/filter.go @@ -190,6 +190,67 @@ func parseVal(val ast.Value, recurseFn parseFn) (any, error) { return nil, ErrFailedToParseConditionValue } +// ParseFilterFieldsForDescription parses the fields that are defined in the SchemaDescription +// from the filter conditions“ +func ParseFilterFieldsForDescription( + conditions map[string]any, + schema client.SchemaDescription, +) ([]client.FieldDescription, error) { + return parseFilterFieldsForDescriptionMap(conditions, schema) +} + +func parseFilterFieldsForDescriptionMap( + conditions map[string]any, + schema client.SchemaDescription, +) ([]client.FieldDescription, error) { + fields := make([]client.FieldDescription, 0) + for k, v := range conditions { + switch k { + case "_or", "_and": + conds := v.([]any) + parsedFileds, err := parseFilterFieldsForDescriptionSlice(conds, schema) + if err != nil { + return nil, err + } + fields = append(fields, parsedFileds...) + case "_not": + conds := v.(map[string]any) + parsedFileds, err := parseFilterFieldsForDescriptionMap(conds, schema) + if err != nil { + return nil, err + } + fields = append(fields, parsedFileds...) + default: + f, found := schema.GetField(k) + if !found || f.IsObject() { + continue + } + fields = append(fields, f) + } + } + return fields, nil +} + +func parseFilterFieldsForDescriptionSlice( + conditions []any, + schema client.SchemaDescription, +) ([]client.FieldDescription, error) { + fields := make([]client.FieldDescription, 0) + for _, v := range conditions { + switch cond := v.(type) { + case map[string]any: + parsedFields, err := parseFilterFieldsForDescriptionMap(cond, schema) + if err != nil { + return nil, err + } + fields = append(fields, parsedFields...) + default: + return nil, ErrInvalidFilterConditions + } + } + return fields, nil +} + /* userCollection := db.getCollection("users") doc := userCollection.NewFromJSON("{ diff --git a/tests/integration/explain/default/delete_test.go b/tests/integration/explain/default/delete_test.go index 43c6e7f0b0..71f454b6e7 100644 --- a/tests/integration/explain/default/delete_test.go +++ b/tests/integration/explain/default/delete_test.go @@ -111,7 +111,7 @@ func TestDefaultExplainMutationRequestWithDeleteUsingFilterToMatchEverything(t * TargetNodeName: "deleteNode", IncludeChildNodes: false, ExpectedAttributes: dataMap{ - "filter": dataMap{}, + "filter": nil, "ids": []string(nil), }, }, @@ -122,7 +122,7 @@ func TestDefaultExplainMutationRequestWithDeleteUsingFilterToMatchEverything(t * ExpectedAttributes: dataMap{ "collectionID": "3", "collectionName": "Author", - "filter": dataMap{}, + "filter": nil, "spans": []dataMap{ { "end": "/4", diff --git a/tests/integration/explain/execute/create_test.go b/tests/integration/explain/execute/create_test.go index 2c233b6358..1b4dcbedad 100644 --- a/tests/integration/explain/execute/create_test.go +++ b/tests/integration/explain/execute/create_test.go @@ -45,9 +45,8 @@ func TestExecuteExplainMutationRequestWithCreate(t *testing.T) { "iterations": uint64(1), "filterMatches": uint64(1), "scanNode": dataMap{ - "iterations": uint64(1), - "docFetches": uint64(1), - "filterMatches": uint64(1), + "iterations": uint64(1), + "docFetches": uint64(1), }, }, }, diff --git a/tests/integration/explain/execute/delete_test.go b/tests/integration/explain/execute/delete_test.go index 265844bed3..d7734e6c0d 100644 --- a/tests/integration/explain/execute/delete_test.go +++ b/tests/integration/explain/execute/delete_test.go @@ -48,9 +48,8 @@ func TestExecuteExplainMutationRequestWithDeleteUsingID(t *testing.T) { "iterations": uint64(2), "filterMatches": uint64(1), "scanNode": dataMap{ - "iterations": uint64(2), - "docFetches": uint64(2), - "filterMatches": uint64(1), + "iterations": uint64(2), + "docFetches": uint64(2), }, }, }, @@ -96,9 +95,8 @@ func TestExecuteExplainMutationRequestWithDeleteUsingFilter(t *testing.T) { "iterations": uint64(2), "filterMatches": uint64(1), "scanNode": dataMap{ - "iterations": uint64(2), - "docFetches": uint64(3), - "filterMatches": uint64(1), + "iterations": uint64(2), + "docFetches": uint64(2), }, }, }, diff --git a/tests/integration/explain/execute/group_test.go b/tests/integration/explain/execute/group_test.go index 534587a2e1..c40b43ef6a 100644 --- a/tests/integration/explain/execute/group_test.go +++ b/tests/integration/explain/execute/group_test.go @@ -56,9 +56,8 @@ func TestExecuteExplainRequestWithGroup(t *testing.T) { "iterations": uint64(3), "filterMatches": uint64(2), "scanNode": dataMap{ - "iterations": uint64(4), - "docFetches": uint64(4), - "filterMatches": uint64(2), + "iterations": uint64(4), + "docFetches": uint64(4), }, }, }, diff --git a/tests/integration/explain/execute/scan_test.go b/tests/integration/explain/execute/scan_test.go index 7942aa870a..d9ac9588f4 100644 --- a/tests/integration/explain/execute/scan_test.go +++ b/tests/integration/explain/execute/scan_test.go @@ -64,9 +64,8 @@ func TestExecuteExplainRequestWithAllDocumentsMatching(t *testing.T) { "iterations": uint64(3), "filterMatches": uint64(2), "scanNode": dataMap{ - "iterations": uint64(3), - "docFetches": uint64(3), - "filterMatches": uint64(2), + "iterations": uint64(3), + "docFetches": uint64(3), }, }, }, @@ -106,9 +105,8 @@ func TestExecuteExplainRequestWithNoDocuments(t *testing.T) { "iterations": uint64(1), "filterMatches": uint64(0), "scanNode": dataMap{ - "iterations": uint64(1), - "docFetches": uint64(1), - "filterMatches": uint64(0), + "iterations": uint64(1), + "docFetches": uint64(1), }, }, }, @@ -169,9 +167,8 @@ func TestExecuteExplainRequestWithSomeDocumentsMatching(t *testing.T) { "iterations": uint64(2), "filterMatches": uint64(1), "scanNode": dataMap{ - "iterations": uint64(2), - "docFetches": uint64(3), - "filterMatches": uint64(1), + "iterations": uint64(2), + "docFetches": uint64(2), }, }, }, @@ -232,9 +229,8 @@ func TestExecuteExplainRequestWithDocumentsButNoMatches(t *testing.T) { "iterations": uint64(1), "filterMatches": uint64(0), "scanNode": dataMap{ - "iterations": uint64(1), - "docFetches": uint64(3), - "filterMatches": uint64(0), + "iterations": uint64(1), + "docFetches": uint64(1), }, }, }, diff --git a/tests/integration/explain/execute/top_level_test.go b/tests/integration/explain/execute/top_level_test.go index 0dc9d824f6..2c72ba48d5 100644 --- a/tests/integration/explain/execute/top_level_test.go +++ b/tests/integration/explain/execute/top_level_test.go @@ -67,9 +67,8 @@ func TestExecuteExplainTopLevelAverageRequest(t *testing.T) { "iterations": uint64(3), "filterMatches": uint64(2), "scanNode": dataMap{ - "iterations": uint64(3), - "docFetches": uint64(3), - "filterMatches": uint64(2), + "iterations": uint64(3), + "docFetches": uint64(3), }, }, }, @@ -150,9 +149,8 @@ func TestExecuteExplainTopLevelCountRequest(t *testing.T) { "iterations": uint64(3), "filterMatches": uint64(2), "scanNode": dataMap{ - "iterations": uint64(3), - "docFetches": uint64(3), - "filterMatches": uint64(2), + "iterations": uint64(3), + "docFetches": uint64(3), }, }, }, @@ -224,9 +222,8 @@ func TestExecuteExplainTopLevelSumRequest(t *testing.T) { "iterations": uint64(3), "filterMatches": uint64(2), "scanNode": dataMap{ - "iterations": uint64(3), - "docFetches": uint64(3), - "filterMatches": uint64(2), + "iterations": uint64(3), + "docFetches": uint64(3), }, }, }, diff --git a/tests/integration/explain/execute/type_join_test.go b/tests/integration/explain/execute/type_join_test.go index 1462340276..de02c25170 100644 --- a/tests/integration/explain/execute/type_join_test.go +++ b/tests/integration/explain/execute/type_join_test.go @@ -53,9 +53,8 @@ func TestExecuteExplainRequestWithAOneToOneJoin(t *testing.T) { "typeIndexJoin": dataMap{ "iterations": uint64(3), "scanNode": dataMap{ - "iterations": uint64(3), - "docFetches": uint64(3), - "filterMatches": uint64(2), + "iterations": uint64(3), + "docFetches": uint64(3), }, }, }, @@ -112,9 +111,8 @@ func TestExecuteExplainWithMultipleOneToOneJoins(t *testing.T) { "typeIndexJoin": dataMap{ "iterations": uint64(3), "scanNode": dataMap{ - "iterations": uint64(3), - "docFetches": uint64(3), - "filterMatches": uint64(2), + "iterations": uint64(3), + "docFetches": uint64(3), }, }, }, @@ -122,9 +120,8 @@ func TestExecuteExplainWithMultipleOneToOneJoins(t *testing.T) { "typeIndexJoin": dataMap{ "iterations": uint64(3), "scanNode": dataMap{ - "iterations": uint64(3), - "docFetches": uint64(3), - "filterMatches": uint64(2), + "iterations": uint64(3), + "docFetches": uint64(3), }, }, }, @@ -184,9 +181,8 @@ func TestExecuteExplainWithTwoLevelDeepNestedJoins(t *testing.T) { "typeIndexJoin": dataMap{ "iterations": uint64(3), "scanNode": dataMap{ - "iterations": uint64(3), - "docFetches": uint64(3), - "filterMatches": uint64(2), + "iterations": uint64(3), + "docFetches": uint64(3), }, }, }, diff --git a/tests/integration/explain/execute/update_test.go b/tests/integration/explain/execute/update_test.go index 901901e167..7e858ba1ed 100644 --- a/tests/integration/explain/execute/update_test.go +++ b/tests/integration/explain/execute/update_test.go @@ -56,9 +56,8 @@ func TestExecuteExplainMutationRequestWithUpdateUsingIDs(t *testing.T) { "iterations": uint64(6), "filterMatches": uint64(4), "scanNode": dataMap{ - "iterations": uint64(6), - "docFetches": uint64(6), - "filterMatches": uint64(4), + "iterations": uint64(6), + "docFetches": uint64(6), }, }, }, @@ -113,9 +112,8 @@ func TestExecuteExplainMutationRequestWithUpdateUsingFilter(t *testing.T) { "iterations": uint64(4), "filterMatches": uint64(2), "scanNode": dataMap{ - "iterations": uint64(4), - "docFetches": uint64(6), - "filterMatches": uint64(2), + "iterations": uint64(4), + "docFetches": uint64(4), }, }, }, diff --git a/tests/integration/explain/execute/with_average_test.go b/tests/integration/explain/execute/with_average_test.go index c528b4933b..e4701c496c 100644 --- a/tests/integration/explain/execute/with_average_test.go +++ b/tests/integration/explain/execute/with_average_test.go @@ -53,9 +53,8 @@ func TestExecuteExplainAverageRequestOnArrayField(t *testing.T) { "iterations": uint64(4), "filterMatches": uint64(3), "scanNode": dataMap{ - "iterations": uint64(4), - "docFetches": uint64(4), - "filterMatches": uint64(3), + "iterations": uint64(4), + "docFetches": uint64(4), }, }, }, @@ -113,9 +112,8 @@ func TestExplainExplainAverageRequestOnJoinedField(t *testing.T) { "typeIndexJoin": dataMap{ "iterations": uint64(3), "scanNode": dataMap{ - "iterations": uint64(3), - "docFetches": uint64(3), - "filterMatches": uint64(2), + "iterations": uint64(3), + "docFetches": uint64(3), }, }, }, diff --git a/tests/integration/explain/execute/with_count_test.go b/tests/integration/explain/execute/with_count_test.go index 607dd78b7a..4680862ba9 100644 --- a/tests/integration/explain/execute/with_count_test.go +++ b/tests/integration/explain/execute/with_count_test.go @@ -54,9 +54,8 @@ func TestExecuteExplainRequestWithCountOnOneToManyRelation(t *testing.T) { "typeIndexJoin": dataMap{ "iterations": uint64(3), "scanNode": dataMap{ - "iterations": uint64(3), - "docFetches": uint64(3), - "filterMatches": uint64(2), + "iterations": uint64(3), + "docFetches": uint64(3), }, }, }, diff --git a/tests/integration/explain/execute/with_limit_test.go b/tests/integration/explain/execute/with_limit_test.go index 5f2555cade..345365cdc1 100644 --- a/tests/integration/explain/execute/with_limit_test.go +++ b/tests/integration/explain/execute/with_limit_test.go @@ -48,9 +48,8 @@ func TestExecuteExplainRequestWithBothLimitAndOffsetOnParent(t *testing.T) { "iterations": uint64(2), "filterMatches": uint64(2), "scanNode": dataMap{ - "iterations": uint64(2), - "docFetches": uint64(2), - "filterMatches": uint64(2), + "iterations": uint64(2), + "docFetches": uint64(2), }, }, }, @@ -104,9 +103,8 @@ func TestExecuteExplainRequestWithBothLimitAndOffsetOnParentAndLimitOnChild(t *t "typeIndexJoin": dataMap{ "iterations": uint64(2), "scanNode": dataMap{ - "iterations": uint64(2), - "docFetches": uint64(2), - "filterMatches": uint64(2), + "iterations": uint64(2), + "docFetches": uint64(2), }, }, }, diff --git a/tests/integration/explain/execute/with_order_test.go b/tests/integration/explain/execute/with_order_test.go index 43ecc2ba9b..58a3b364a4 100644 --- a/tests/integration/explain/execute/with_order_test.go +++ b/tests/integration/explain/execute/with_order_test.go @@ -49,9 +49,8 @@ func TestExecuteExplainRequestWithOrderFieldOnParent(t *testing.T) { "filterMatches": uint64(2), "iterations": uint64(3), "scanNode": dataMap{ - "iterations": uint64(3), - "docFetches": uint64(3), - "filterMatches": uint64(2), + "iterations": uint64(3), + "docFetches": uint64(3), }, }, }, @@ -132,9 +131,8 @@ func TestExecuteExplainRequestWithMultiOrderFieldsOnParent(t *testing.T) { "filterMatches": uint64(4), "iterations": uint64(5), "scanNode": dataMap{ - "iterations": uint64(5), - "docFetches": uint64(5), - "filterMatches": uint64(4), + "iterations": uint64(5), + "docFetches": uint64(5), }, }, }, @@ -186,9 +184,8 @@ func TestExecuteExplainRequestWithOrderFieldOnChild(t *testing.T) { "typeIndexJoin": dataMap{ "iterations": uint64(3), "scanNode": dataMap{ - "iterations": uint64(3), - "docFetches": uint64(3), - "filterMatches": uint64(2), + "iterations": uint64(3), + "docFetches": uint64(3), }, }, }, @@ -243,9 +240,8 @@ func TestExecuteExplainRequestWithOrderFieldOnBothParentAndChild(t *testing.T) { "typeIndexJoin": dataMap{ "iterations": uint64(3), "scanNode": dataMap{ - "iterations": uint64(3), - "docFetches": uint64(3), - "filterMatches": uint64(2), + "iterations": uint64(3), + "docFetches": uint64(3), }, }, }, @@ -304,9 +300,8 @@ func TestExecuteExplainRequestWhereParentFieldIsOrderedByChildField(t *testing.T "typeIndexJoin": dataMap{ "iterations": uint64(3), "scanNode": dataMap{ - "iterations": uint64(3), - "docFetches": uint64(3), - "filterMatches": uint64(2), + "iterations": uint64(3), + "docFetches": uint64(3), }, }, }, diff --git a/tests/integration/explain/execute/with_sum_test.go b/tests/integration/explain/execute/with_sum_test.go index 43401e7c51..3283bc6776 100644 --- a/tests/integration/explain/execute/with_sum_test.go +++ b/tests/integration/explain/execute/with_sum_test.go @@ -49,9 +49,8 @@ func TestExecuteExplainRequestWithSumOfInlineArrayField(t *testing.T) { "iterations": uint64(4), "filterMatches": uint64(3), "scanNode": dataMap{ - "iterations": uint64(4), - "docFetches": uint64(4), - "filterMatches": uint64(3), + "iterations": uint64(4), + "docFetches": uint64(4), }, }, }, @@ -107,9 +106,8 @@ func TestExecuteExplainRequestSumOfRelatedOneToManyField(t *testing.T) { "typeIndexJoin": dataMap{ "iterations": uint64(3), "scanNode": dataMap{ - "iterations": uint64(3), - "docFetches": uint64(3), - "filterMatches": uint64(2), + "iterations": uint64(3), + "docFetches": uint64(3), }, }, }, diff --git a/tests/integration/query/one_to_many/with_group_related_id_alias_test.go b/tests/integration/query/one_to_many/with_group_related_id_alias_test.go index 4f4d1c9a37..8e2223e324 100644 --- a/tests/integration/query/one_to_many/with_group_related_id_alias_test.go +++ b/tests/integration/query/one_to_many/with_group_related_id_alias_test.go @@ -242,7 +242,6 @@ func TestQueryOneToManyWithParentGroupByOnRelatedTypeFromManySideUsingAliasAndRe }, Results: []map[string]any{ { - "author_id": "bae-7accaba8-ea9d-54b1-92f4-4a7ac5de88b3", "author": map[string]any{ "name": "Voltaire", "_key": "bae-7accaba8-ea9d-54b1-92f4-4a7ac5de88b3", @@ -267,7 +266,6 @@ func TestQueryOneToManyWithParentGroupByOnRelatedTypeFromManySideUsingAliasAndRe }, }, { - "author_id": "bae-41598f0c-19bc-5da6-813b-e80f14a10df3", "author": map[string]any{ "name": "John Grisham", "_key": "bae-41598f0c-19bc-5da6-813b-e80f14a10df3", @@ -300,7 +298,6 @@ func TestQueryOneToManyWithParentGroupByOnRelatedTypeFromManySideUsingAliasAndRe }, }, { - "author_id": "bae-09d33399-197a-5b98-b135-4398f2b6de4c", "author": map[string]any{ "name": "Simon Pelloutier", "_key": "bae-09d33399-197a-5b98-b135-4398f2b6de4c", diff --git a/tests/integration/query/one_to_one_to_one/with_order_test.go b/tests/integration/query/one_to_one_to_one/with_order_test.go new file mode 100644 index 0000000000..f9ee84f86c --- /dev/null +++ b/tests/integration/query/one_to_one_to_one/with_order_test.go @@ -0,0 +1,123 @@ +// Copyright 2023 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package one_to_one_to_one + +import ( + "testing" + + testUtils "github.com/sourcenetwork/defradb/tests/integration" +) + +func TestQueryOneToOneToOneWithNestedOrder(t *testing.T) { + test := testUtils.TestCase{ + Description: "One-to-one-to-one relation primary direction", + Actions: []any{ + testUtils.SchemaUpdate{ + Schema: ` + type Publisher { + name: String + printed: Book @primary + } + + type Book { + name: String + publisher: Publisher + author: Author @primary + } + + type Author { + name: String + published: Book + } + `, + }, + testUtils.CreateDoc{ + CollectionID: 0, + // "bae-1f4cc394-08a8-5825-87b9-b02de2f25f7d" + Doc: `{ + "name": "Old Publisher" + }`, + }, + testUtils.CreateDoc{ + CollectionID: 0, + // "bae-a3cd6fac-13c0-5c8f-970b-0ce7abbb49a5" + Doc: `{ + "name": "New Publisher" + }`, + }, + testUtils.CreateDoc{ + CollectionID: 1, + // "bae-a6cdabfc-17dd-5662-b213-c596ee4c3292" + Doc: `{ + "name": "Painted House", + "publisher_id": "bae-1f4cc394-08a8-5825-87b9-b02de2f25f7d" + }`, + }, + testUtils.CreateDoc{ + CollectionID: 1, + // "bae-bc198c5f-6238-5b50-8072-68dec9c7a16b" + Doc: `{ + "name": "Theif Lord", + "publisher_id": "bae-a3cd6fac-13c0-5c8f-970b-0ce7abbb49a5" + }`, + }, + testUtils.CreateDoc{ + CollectionID: 2, + Doc: `{ + "name": "John Grisham", + "published_id": "bae-a6cdabfc-17dd-5662-b213-c596ee4c3292" + }`, + }, + testUtils.CreateDoc{ + CollectionID: 2, + Doc: `{ + "name": "Cornelia Funke", + "published_id": "bae-bc198c5f-6238-5b50-8072-68dec9c7a16b" + }`, + }, + testUtils.Request{ + Request: `query { + Publisher(order: {printed: {author: {name: ASC}}}) { + name + printed { + name + author { + name + } + } + } + }`, + Results: []map[string]any{ + { + "name": "New Publisher", + "printed": map[string]any{ + "name": "Theif Lord", + "author": map[string]any{ + "name": "Cornelia Funke", + }, + }, + }, + { + "name": "Old Publisher", + "printed": map[string]any{ + "name": "Painted House", + "author": map[string]any{ + "name": "John Grisham", + }, + }, + }, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, []string{"Publisher", "Book", "Author"}, test) +} diff --git a/tests/integration/query/simple/with_group_test.go b/tests/integration/query/simple/with_group_test.go index 6489e52fb8..c926f580a6 100644 --- a/tests/integration/query/simple/with_group_test.go +++ b/tests/integration/query/simple/with_group_test.go @@ -213,6 +213,7 @@ func TestQuerySimpleWithGroupByWithoutGroupedFieldSelectedWithInnerGroup(t *test Description: "Simple query with groupBy without selecting field grouped by, with inner _group.", Request: `query { Users(groupBy: [Name]) { + Name _group { Age }