Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

Datastore based pinner #4

Merged
merged 29 commits into from
Nov 30, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
9fc54f6
start restructuring for move to datastore pinner
aschmahmann Sep 30, 2020
be03825
Store pins in datastore instead of in mdag
gammazero Oct 26, 2020
77c9a1d
Import and Export functions. Cid stored as bytes. Revise indexer inte…
gammazero Oct 27, 2020
5398bb2
add name index
gammazero Oct 27, 2020
922fab7
add benchmarks
gammazero Oct 27, 2020
fdf37b1
Use dirty flag to determine when to rebuild indexes
gammazero Oct 28, 2020
a2720f1
Fix benchmarks
gammazero Oct 28, 2020
37293b6
Do not keep pinned CID sets in memory (no-cache implementation)
gammazero Oct 29, 2020
0244724
Add comments and unit test
gammazero Oct 29, 2020
22a61da
Speed up pinning by avoining 2nd recursive check if no changes
gammazero Oct 29, 2020
1662bb8
correct log level
gammazero Oct 29, 2020
cad8378
improve import/export unit test
gammazero Oct 29, 2020
d1a44d7
Update returns error if from CID is not pinned, even when from and to…
gammazero Nov 16, 2020
9bb7a0c
test update of same pin
gammazero Nov 16, 2020
eb32271
Cleanup and better test coverage
gammazero Nov 17, 2020
22254b2
Change requested in review
gammazero Nov 19, 2020
d787682
Additional changes from review
gammazero Nov 19, 2020
0435ac4
Removed New in favor of only having LoadPinner
gammazero Nov 19, 2020
dde41e9
Indexer encodes index and key to allow arbitrary strings
gammazero Nov 19, 2020
0a5737f
Use int64 for dirty count and remove unused const
gammazero Nov 19, 2020
fb419e7
use base64 encoding
gammazero Nov 20, 2020
00764f0
Encode using multibase
gammazero Nov 20, 2020
a6d812c
Changes from review
gammazero Nov 24, 2020
86f36c2
Rename LoadPinner to New for both pinners
gammazero Nov 24, 2020
7a128c6
Check context when loading pinner and during iterative operations
gammazero Nov 24, 2020
cd2065d
indexer.New takes ds.Key
gammazero Nov 24, 2020
9c335cd
Change pin encoding. Add unit test
gammazero Nov 24, 2020
9fafc51
switch to atlas pin encoding
aschmahmann Nov 30, 2020
2586c60
removed type annotations from pin struct
aschmahmann Nov 30, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dsindex/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ package dsindex
import "errors"

var (
ErrEmptyIndex = errors.New("index is empty")
ErrEmptyKey = errors.New("key is empty")
ErrEmptyValue = errors.New("value is empty")
)
169 changes: 85 additions & 84 deletions dsindex/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,103 +7,105 @@ import (
"path"

ds "github.com/ipfs/go-datastore"
query "github.com/ipfs/go-datastore/query"
"github.com/ipfs/go-datastore/namespace"
"github.com/ipfs/go-datastore/query"
"github.com/multiformats/go-multibase"
)

// Indexer maintains a secondary index. Each value of the secondary index maps
// to one more primary keys.
// Indexer maintains a secondary index. An index is a collection of key-value
// mappings where the key is the secondary index that maps to one or more
// values, where each value is a unique key being indexed.
type Indexer interface {
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
// Add adds the specified key to the an index
Add(ctx context.Context, index, key string) error
// Add adds the specified value to the key
Add(ctx context.Context, key, value string) error

// Delete deletes the specified key from the index. If the key is not in
// Delete deletes the specified value from the key. If the value is not in
// the datastore, this method returns no error.
Delete(ctx context.Context, index, key string) error
Delete(ctx context.Context, key, value string) error

// DeleteIndex deletes all keys in the given index. If an index is not in
// the datastore, this method returns no error.
DeleteIndex(ctx context.Context, index string) (count int, err error)
// DeleteKey deletes all values in the given key. If a key is not in the
// datastore, this method returns no error. Returns a count of values that
// were deleted.
DeleteKey(ctx context.Context, key string) (count int, err error)

// DeleteAll deletes all indexes managed by this Indexer
// DeleteAll deletes all keys managed by this Indexer. Returns a count of
// the values that were deleted.
DeleteAll(ctx context.Context) (count int, err error)

// ForEach calls the function for each key in the specified index, until
// there are no more keys, or until the function returns false. If index
// is empty string, then all indexs are iterated.
ForEach(ctx context.Context, index string, fn func(index, key string) bool) error
// ForEach calls the function for each value in the specified index, until
// there are no more values, or until the function returns false. If key
// is empty string, then all keys are iterated.
ForEach(ctx context.Context, index string, fn func(key, value string) bool) error
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved

// HasKey determines if the index contains the specified key
HasKey(ctx context.Context, index, key string) (bool, error)
// HasValue determines if the key contains the specified value
HasValue(ctx context.Context, key, value string) (bool, error)

// HasAny determines if any key is in the specified index. If index is
// empty string, then all indexes are searched.
HasAny(ctx context.Context, index string) (bool, error)
// HasAny determines if any value is in the specified key. If key is
// empty string, then all values are searched.
HasAny(ctx context.Context, key string) (bool, error)

// Search returns all keys for the given index
Search(ctx context.Context, index string) (ids []string, err error)
// Search returns all values for the given key
Search(ctx context.Context, key string) (values []string, err error)
}

// indexer is a simple implementation of Indexer. This implementation relies
// on the underlying data store supporting efficent querying by prefix.
// on the underlying data store to support efficient querying by prefix.
//
// TODO: Consider adding caching
type indexer struct {
dstore ds.Datastore
indexPath string
dstore ds.Datastore
}

// New creates a new datastore index. All indexes are stored prefixed with the
// specified index path.
// New creates a new datastore index. All indexes are stored under the
// specified index name.
//
// To persist the actions of calling Indexer functions, it is necessary to call
// dstore.Sync.
func New(dstore ds.Datastore, indexPath string) Indexer {
func New(dstore ds.Datastore, name string) Indexer {
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
return &indexer{
dstore: dstore,
indexPath: indexPath,
dstore: namespace.Wrap(dstore, ds.NewKey(name)),
}
}

func (x *indexer) Add(ctx context.Context, index, key string) error {
if index == "" {
return ErrEmptyIndex
}
func (x *indexer) Add(ctx context.Context, key, value string) error {
if key == "" {
return ErrEmptyKey
}
dskey := ds.NewKey(path.Join(x.indexPath, encode(index), encode(key)))
return x.dstore.Put(dskey, []byte{})
if value == "" {
return ErrEmptyValue
}
dsKey := ds.NewKey(encode(key)).ChildString(encode(value))
return x.dstore.Put(dsKey, []byte{})
}

func (x *indexer) Delete(ctx context.Context, index, key string) error {
if index == "" {
return ErrEmptyIndex
}
func (x *indexer) Delete(ctx context.Context, key, value string) error {
if key == "" {
return ErrEmptyKey
}
dskey := ds.NewKey(path.Join(x.indexPath, encode(index), encode(key)))
return x.dstore.Delete(dskey)
if value == "" {
return ErrEmptyValue
}
return x.dstore.Delete(ds.NewKey(encode(key)).ChildString(encode(value)))
}

func (x *indexer) DeleteIndex(ctx context.Context, index string) (int, error) {
if index == "" {
return 0, ErrEmptyIndex
func (x *indexer) DeleteKey(ctx context.Context, key string) (int, error) {
if key == "" {
return 0, ErrEmptyKey
}
return x.deletePrefix(ctx, path.Join(x.indexPath, encode(index)))
return x.deletePrefix(ctx, encode(key))
}

func (x *indexer) DeleteAll(ctx context.Context) (int, error) {
return x.deletePrefix(ctx, x.indexPath)
return x.deletePrefix(ctx, "")
}

func (x *indexer) ForEach(ctx context.Context, index string, fn func(idx, key string) bool) error {
if index != "" {
index = encode(index)
func (x *indexer) ForEach(ctx context.Context, key string, fn func(key, value string) bool) error {
if key != "" {
key = encode(key)
}

q := query.Query{
Prefix: path.Join(x.indexPath, index),
Prefix: key,
KeysOnly: true,
}
results, err := x.dstore.Query(q)
Expand Down Expand Up @@ -141,56 +143,55 @@ func (x *indexer) ForEach(ctx context.Context, index string, fn func(idx, key st
return err
}

func (x *indexer) HasKey(ctx context.Context, index, key string) (bool, error) {
if index == "" {
return false, ErrEmptyIndex
}
func (x *indexer) HasValue(ctx context.Context, key, value string) (bool, error) {
if key == "" {
return false, ErrEmptyKey
}
dskey := ds.NewKey(path.Join(x.indexPath, encode(index), encode(key)))
return x.dstore.Has(dskey)
if value == "" {
return false, ErrEmptyValue
}
return x.dstore.Has(ds.NewKey(encode(key)).ChildString(encode(value)))
}

func (x *indexer) HasAny(ctx context.Context, index string) (bool, error) {
func (x *indexer) HasAny(ctx context.Context, key string) (bool, error) {
var any bool
err := x.ForEach(ctx, index, func(idx, key string) bool {
err := x.ForEach(ctx, key, func(key, value string) bool {
any = true
return false
})
return any, err
}

func (x *indexer) Search(ctx context.Context, index string) ([]string, error) {
if index == "" {
return nil, ErrEmptyIndex
func (x *indexer) Search(ctx context.Context, key string) ([]string, error) {
if key == "" {
return nil, ErrEmptyKey
}
ents, err := x.queryPrefix(ctx, path.Join(x.indexPath, encode(index)))
ents, err := x.queryPrefix(ctx, encode(key))
if err != nil {
return nil, err
}
if len(ents) == 0 {
return nil, nil
}

keys := make([]string, len(ents))
values := make([]string, len(ents))
for i := range ents {
keys[i], err = decode(path.Base(ents[i].Key))
values[i], err = decode(path.Base(ents[i].Key))
if err != nil {
return nil, fmt.Errorf("cannot decode key: %v", err)
return nil, fmt.Errorf("cannot decode value: %v", err)
}
}
return keys, nil
return values, nil
}

// SyncIndex synchronizes the indexes in the target Indexer to match those of
// the ref Indexer. The indexPath prefix is not synchronized, only the
// index/key portion of the indexes.
// SyncIndex synchronizes the keys in the target Indexer to match those of the
// ref Indexer. The name portion of the stored data is not synchronized, only
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
// the key/value portion of the indexes.
func SyncIndex(ctx context.Context, ref, target Indexer) (bool, error) {
// Build reference index map
refs := map[string]string{}
err := ref.ForEach(ctx, "", func(idx, key string) bool {
refs[key] = idx
err := ref.ForEach(ctx, "", func(key, value string) bool {
refs[value] = key
return true
})
if err != nil {
Expand All @@ -202,31 +203,31 @@ func SyncIndex(ctx context.Context, ref, target Indexer) (bool, error) {

// Compare current indexes
dels := map[string]string{}
err = target.ForEach(ctx, "", func(idx, key string) bool {
refIdx, ok := refs[key]
if ok && refIdx == idx {
// same in both; delete from refs, do not add to delKeys
delete(refs, key)
err = target.ForEach(ctx, "", func(key, value string) bool {
refKey, ok := refs[value]
if ok && refKey == key {
// same in both; delete from refs, do not add to dels
delete(refs, value)
} else {
dels[key] = idx
dels[value] = key
}
return true
})
if err != nil {
return false, err
}

// Items in dels are indexes that no longer exist
for key, idx := range dels {
err = target.Delete(ctx, idx, key)
// Items in dels are keys that no longer exist
for value, key := range dels {
err = target.Delete(ctx, key, value)
if err != nil {
return false, err
}
}

// What remains in refs are indexes that need to be added
for key, idx := range refs {
err = target.Add(ctx, idx, key)
// What remains in refs are keys that need to be added
for value, key := range refs {
err = target.Add(ctx, key, value)
if err != nil {
return false, err
}
Expand Down
Loading