Skip to content

Commit

Permalink
refactor: Fetcher filter and field optimization (#1500)
Browse files Browse the repository at this point in the history
## Relevant issue(s)

Resolves #490 
Resolves #1582 (indirectly)

## Description

This is a reduced version of #491. It takes a very different approach,
and tries to keep as much of the existing Fetcher structure as possible.

Basically, this will try to eagerly ignore documents that don't pass the
given filter at the fetcher level. This means we can apply various
optimizations then if the filter was applied at the scanNode level like
before.
  • Loading branch information
jsimnz authored Jun 27, 2023
1 parent 4fe32de commit 6c0c5bd
Show file tree
Hide file tree
Showing 46 changed files with 906 additions and 365 deletions.
24 changes: 12 additions & 12 deletions client/descriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,6 @@ func (col CollectionDescription) IDString() string {
return fmt.Sprint(col.ID)
}

// GetField returns the field of the given name.
func (col CollectionDescription) GetField(name string) (FieldDescription, bool) {
if !col.Schema.IsEmpty() {
for _, field := range col.Schema.Fields {
if field.Name == name {
return field, true
}
}
}
return FieldDescription{}, false
}

// GetFieldByID searches for a field with the given ID. If such a field is found it
// will return it and true, if it is not found it will return false.
func (col CollectionDescription) GetFieldByID(id FieldID) (FieldDescription, bool) {
Expand Down Expand Up @@ -118,6 +106,18 @@ func (sd SchemaDescription) GetFieldKey(fieldName string) uint32 {
return uint32(0)
}

// GetField returns the field of the given name.
func (sd SchemaDescription) GetField(name string) (FieldDescription, bool) {
if !sd.IsEmpty() {
for _, field := range sd.Fields {
if field.Name == name {
return field, true
}
}
}
return FieldDescription{}, false
}

// FieldKind describes the type of a field.
type FieldKind uint8

Expand Down
6 changes: 0 additions & 6 deletions core/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,6 @@ func NewSpans(spans ...Span) Spans {
}
}

// KeyValue is a KV store response containing the resulting core.Key and byte array value.
type KeyValue struct {
Key DataStoreKey
Value []byte
}

// HeadKeyValue is a KV store response containing the resulting core.HeadStoreKey
// and byte array value.
type HeadKeyValue struct {
Expand Down
2 changes: 1 addition & 1 deletion db/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -868,7 +868,7 @@ func (c *collection) save(
return cid.Undef, client.NewErrFieldNotExist(k)
}

fieldDescription, valid := c.desc.GetField(k)
fieldDescription, valid := c.desc.Schema.GetField(k)
if !valid {
return cid.Undef, client.NewErrFieldNotExist(k)
}
Expand Down
4 changes: 2 additions & 2 deletions db/collection_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ func (c *collection) get(
ctx context.Context,
txn datastore.Txn,
key core.PrimaryDataStoreKey,
fields []*client.FieldDescription,
fields []client.FieldDescription,
showDeleted bool,
) (*client.Document, error) {
// create a new document fetcher
df := c.newFetcher()
desc := &c.desc
// initialize it with the primary index
err := df.Init(&c.desc, fields, false, showDeleted)
err := df.Init(&c.desc, fields, nil, nil, false, showDeleted)
if err != nil {
_ = df.Close()
return nil, err
Expand Down
16 changes: 8 additions & 8 deletions db/collection_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,20 +128,20 @@ func (c *collection) indexNewDoc(ctx context.Context, txn datastore.Txn, doc *cl
}

// collectIndexedFields returns all fields that are indexed by all collection indexes.
func (c *collection) collectIndexedFields() []*client.FieldDescription {
fieldsMap := make(map[string]*client.FieldDescription)
func (c *collection) collectIndexedFields() []client.FieldDescription {
fieldsMap := make(map[string]client.FieldDescription)
for _, index := range c.indexes {
for _, field := range index.Description().Fields {
for i := range c.desc.Schema.Fields {
colField := &c.desc.Schema.Fields[i]
colField := c.desc.Schema.Fields[i]
if field.Name == colField.Name {
fieldsMap[field.Name] = colField
break
}
}
}
}
fields := make([]*client.FieldDescription, 0, len(fieldsMap))
fields := make([]client.FieldDescription, 0, len(fieldsMap))
for _, field := range fieldsMap {
fields = append(fields, field)
}
Expand Down Expand Up @@ -260,11 +260,11 @@ func (c *collection) createIndex(
func (c *collection) iterateAllDocs(
ctx context.Context,
txn datastore.Txn,
fields []*client.FieldDescription,
fields []client.FieldDescription,
exec func(doc *client.Document) error,
) error {
df := c.newFetcher()
err := df.Init(&c.desc, fields, false, false)
err := df.Init(&c.desc, fields, nil, nil, false, false)
if err != nil {
_ = df.Close()
return err
Expand Down Expand Up @@ -302,10 +302,10 @@ func (c *collection) indexExistingDocs(
txn datastore.Txn,
index CollectionIndex,
) error {
fields := make([]*client.FieldDescription, 0, 1)
fields := make([]client.FieldDescription, 0, 1)
for _, field := range index.Description().Fields {
for i := range c.desc.Schema.Fields {
colField := &c.desc.Schema.Fields[i]
colField := c.desc.Schema.Fields[i]
if field.Name == colField.Name {
fields = append(fields, colField)
break
Expand Down
4 changes: 2 additions & 2 deletions db/collection_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func (c *collection) applyMerge(
return ErrInvalidMergeValueType
}

fd, valid := c.desc.GetField(mfield)
fd, valid := c.desc.Schema.GetField(mfield)
if !valid {
return client.NewErrFieldNotExist(mfield)
}
Expand Down Expand Up @@ -398,7 +398,7 @@ func (c *collection) isSecondaryIDField(fieldDesc client.FieldDescription) (clie
return client.FieldDescription{}, false
}

relationFieldDescription, valid := c.Description().GetField(
relationFieldDescription, valid := c.Description().Schema.GetField(
strings.TrimSuffix(fieldDesc.Name, request.RelatedObjectID),
)
return relationFieldDescription, valid && !relationFieldDescription.IsPrimaryRelation()
Expand Down
71 changes: 57 additions & 14 deletions db/fetcher/encoded_doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package fetcher
import (
"fmt"

"github.com/bits-and-blooms/bitset"
"github.com/fxamacker/cbor/v2"
"github.com/sourcenetwork/immutable"

Expand All @@ -24,12 +25,12 @@ type EncodedDocument interface {
// Key returns the key of the document
Key() []byte
// Reset re-initializes the EncodedDocument object.
Reset(newKey []byte)
Reset()
// Decode returns a properly decoded document object
Decode() (*client.Document, error)
// DecodeToDoc returns a decoded document as a
// map of field/value pairs
DecodeToDoc(*core.DocumentMapping) (core.Doc, error)
DecodeToDoc() (core.Doc, error)
}

type EPTuple []encProperty
Expand All @@ -39,6 +40,10 @@ type encProperty struct {
Desc client.FieldDescription
Raw []byte

// Filter flag to determine if this flag
// is needed for eager filter evaluation
IsFilter bool

// // encoding meta data
// encoding base.DataEncoding
}
Expand Down Expand Up @@ -190,8 +195,20 @@ func convertToInt(propertyName string, untypedValue any) (int64, error) {

// @todo: Implement Encoded Document type
type encodedDocument struct {
mapping *core.DocumentMapping
doc *core.Doc

key []byte
properties map[client.FieldDescription]*encProperty
Properties []*encProperty

// tracking bitsets
// A value of 1 indicates a required field
// 0 means we we ignore the field
// we update the bitsets as we collect values
// by clearing the bit for the FieldID
filterSet *bitset.BitSet // filter fields
selectSet *bitset.BitSet // select fields

}

var _ EncodedDocument = (*encodedDocument)(nil)
Expand All @@ -201,9 +218,15 @@ func (encdoc *encodedDocument) Key() []byte {
}

// Reset re-initializes the EncodedDocument object.
func (encdoc *encodedDocument) Reset(newKey []byte) {
encdoc.properties = make(map[client.FieldDescription]*encProperty)
encdoc.key = newKey
func (encdoc *encodedDocument) Reset() {
encdoc.Properties = make([]*encProperty, 0)
encdoc.key = nil
if encdoc.mapping != nil {
doc := encdoc.mapping.NewDoc()
encdoc.doc = &doc
}
encdoc.filterSet = nil
encdoc.selectSet = nil
}

// Decode returns a properly decoded document object
Expand All @@ -213,12 +236,12 @@ func (encdoc *encodedDocument) Decode() (*client.Document, error) {
return nil, err
}
doc := client.NewDocWithKey(key)
for fieldDesc, prop := range encdoc.properties {
for _, prop := range encdoc.Properties {
ctype, val, err := prop.Decode()
if err != nil {
return nil, err
}
err = doc.SetAs(fieldDesc.Name, val, ctype)
err = doc.SetAs(prop.Desc.Name, val, ctype)
if err != nil {
return nil, err
}
Expand All @@ -229,15 +252,35 @@ func (encdoc *encodedDocument) Decode() (*client.Document, error) {

// DecodeToDoc returns a decoded document as a
// map of field/value pairs
func (encdoc *encodedDocument) DecodeToDoc(mapping *core.DocumentMapping) (core.Doc, error) {
doc := mapping.NewDoc()
doc.SetKey(string(encdoc.key))
for fieldDesc, prop := range encdoc.properties {
func (encdoc *encodedDocument) DecodeToDoc() (core.Doc, error) {
return encdoc.decodeToDoc(false)
}

func (encdoc *encodedDocument) decodeToDocForFilter() (core.Doc, error) {
return encdoc.decodeToDoc(true)
}

func (encdoc *encodedDocument) decodeToDoc(filter bool) (core.Doc, error) {
if encdoc.mapping == nil {
return core.Doc{}, ErrMissingMapper
}
if encdoc.doc == nil {
doc := encdoc.mapping.NewDoc()
encdoc.doc = &doc
}
encdoc.doc.SetKey(string(encdoc.key))
for _, prop := range encdoc.Properties {
if encdoc.doc.Fields[prop.Desc.ID] != nil { // used cached decoded fields
continue
}
if filter && !prop.IsFilter { // only get filter fields if filter=true
continue
}
_, val, err := prop.Decode()
if err != nil {
return core.Doc{}, err
}
doc.Fields[fieldDesc.ID] = val
encdoc.doc.Fields[prop.Desc.ID] = val
}
return doc, nil
return *encdoc.doc, nil
}
2 changes: 2 additions & 0 deletions db/fetcher/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const (
errVFetcherFailedToDecodeNode string = "(version fetcher) failed to decode protobuf"
errVFetcherFailedToGetDagLink string = "(version fetcher) failed to get node link from DAG"
errFailedToGetDagNode string = "failed to get DAG Node"
errMissingMapper string = "missing document mapper"
)

var (
Expand All @@ -38,6 +39,7 @@ var (
ErrVFetcherFailedToDecodeNode = errors.New(errVFetcherFailedToDecodeNode)
ErrVFetcherFailedToGetDagLink = errors.New(errVFetcherFailedToGetDagLink)
ErrFailedToGetDagNode = errors.New(errFailedToGetDagNode)
ErrMissingMapper = errors.New(errMissingMapper)
ErrSingleSpanOnly = errors.New("spans must contain only a single entry")
)

Expand Down
Loading

0 comments on commit 6c0c5bd

Please sign in to comment.