Skip to content

Commit

Permalink
remove map datastore support
Browse files Browse the repository at this point in the history
  • Loading branch information
fredcarle committed Nov 13, 2022
1 parent c0f8a31 commit f60e17c
Show file tree
Hide file tree
Showing 18 changed files with 40 additions and 175 deletions.
4 changes: 2 additions & 2 deletions cli/serverdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ import (
"os"
"os/signal"

ds "github.com/ipfs/go-datastore"
"github.com/spf13/cobra"

ds "github.com/sourcenetwork/defradb/datastore"
badgerds "github.com/sourcenetwork/defradb/datastore/badger/v3"
"github.com/sourcenetwork/defradb/db"
"github.com/sourcenetwork/defradb/errors"
Expand All @@ -36,7 +36,7 @@ var serverDumpCmd = &cobra.Command{
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt)

var rootstore ds.Batching
var rootstore ds.RootStore
var err error
if datastore == badgerDatastoreName {
info, err := os.Stat(cfg.Datastore.Badger.Path)
Expand Down
4 changes: 2 additions & 2 deletions cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"strings"

badger "github.com/dgraph-io/badger/v3"
ds "github.com/ipfs/go-datastore"
ma "github.com/multiformats/go-multiaddr"
"github.com/spf13/cobra"
"github.com/spf13/viper"
Expand All @@ -30,6 +29,7 @@ import (
httpapi "github.com/sourcenetwork/defradb/api/http"
"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/config"
ds "github.com/sourcenetwork/defradb/datastore"
badgerds "github.com/sourcenetwork/defradb/datastore/badger/v3"
"github.com/sourcenetwork/defradb/db"
"github.com/sourcenetwork/defradb/errors"
Expand Down Expand Up @@ -212,7 +212,7 @@ func (di *defraInstance) close(ctx context.Context) {
func start(ctx context.Context) (*defraInstance, error) {
log.FeedbackInfo(ctx, "Starting DefraDB service...")

var rootstore ds.Batching
var rootstore ds.RootStore

var err error
if cfg.Datastore.Store == badgerDatastoreName {
Expand Down
3 changes: 1 addition & 2 deletions client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package client
import (
"context"

ds "github.com/ipfs/go-datastore"
blockstore "github.com/ipfs/go-ipfs-blockstore"

"github.com/sourcenetwork/defradb/datastore"
Expand All @@ -27,7 +26,7 @@ type DB interface {
GetCollectionBySchemaID(context.Context, string) (Collection, error)
GetAllCollections(ctx context.Context) ([]Collection, error)

Root() ds.Batching
Root() datastore.RootStore
Blockstore() blockstore.Blockstore

NewTxn(context.Context, bool) (datastore.Txn, error)
Expand Down
6 changes: 6 additions & 0 deletions datastore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ var (
log = logging.MustNewLogger("defradb.store")
)

// RootStore wraps Batching and TxnDatastore requiring datastore to support both batching and transactions.
type RootStore interface {
ds.Batching
ds.TxnDatastore
}

// MultiStore is an interface wrapper around the 3 main types of stores needed for MerkleCRDTs.
type MultiStore interface {
Rootstore() DSReaderWriter
Expand Down
44 changes: 4 additions & 40 deletions datastore/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import (
type Txn interface {
MultiStore

IsBatch() bool

// Commit finalizes a transaction, attempting to commit it to the Datastore.
// May return an error if the transaction has gone stale. The presence of an
// error is an indication that the data was not committed to the Datastore.
Expand All @@ -41,15 +39,14 @@ type Txn interface {
type txn struct {
t ds.Txn
MultiStore
isBatch bool

successFns []func()
errorFns []func()
}

var _ Txn = (*txn)(nil)

func NewTxnFrom(ctx context.Context, rootstore ds.Batching, readonly bool) (Txn, error) {
func NewTxnFrom(ctx context.Context, rootstore ds.TxnDatastore, readonly bool) (Txn, error) {
// check if our datastore natively supports iterable transaction, transactions or batching
if iterableTxnStore, ok := rootstore.(iterable.IterableTxnDatastore); ok {
rootTxn, err := iterableTxnStore.NewIterableTransaction(ctx, readonly)
Expand All @@ -60,40 +57,21 @@ func NewTxnFrom(ctx context.Context, rootstore ds.Batching, readonly bool) (Txn,
return &txn{
rootTxn,
multistore,
false,
[]func(){},
[]func(){},
}, nil
}

var rootTxn ds.Txn
var err error
var isBatch bool
if txnStore, ok := rootstore.(ds.TxnDatastore); ok {
rootTxn, err = txnStore.NewTransaction(ctx, readonly)
if err != nil {
return nil, err
}
} else {
batcher, err := rootstore.Batch(ctx)
if err != nil {
return nil, err
}

// hide a ds.Batching store as a ds.Txn
rootTxn = ShimBatcherTxn{
Read: rootstore,
Batch: batcher,
}
isBatch = true
rootTxn, err := rootstore.NewTransaction(ctx, readonly)
if err != nil {
return nil, err
}

root := AsDSReaderWriter(ShimTxnStore{rootTxn})
multistore := MultiStoreFrom(root)
return &txn{
rootTxn,
multistore,
isBatch,
[]func(){},
[]func(){},
}, nil
Expand Down Expand Up @@ -138,10 +116,6 @@ func (txn *txn) runSuccessFns(ctx context.Context) {
}
}

func (txn *txn) IsBatch() bool {
return txn.isBatch
}

// Shim to make ds.Txn support ds.Datastore
type ShimTxnStore struct {
ds.Txn
Expand All @@ -155,13 +129,3 @@ func (ts ShimTxnStore) Close() error {
ts.Discard(context.TODO())
return nil
}

// shim to make ds.Batch implement ds.Datastore
type ShimBatcherTxn struct {
ds.Read
ds.Batch
}

func (ShimBatcherTxn) Discard(_ context.Context) {
// noop
}
11 changes: 5 additions & 6 deletions datastore/wrappedStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

ds "github.com/ipfs/go-datastore"
ktds "github.com/ipfs/go-datastore/keytransform"
"github.com/ipfs/go-datastore/query"
dsq "github.com/ipfs/go-datastore/query"

"github.com/sourcenetwork/defradb/datastore/iterable"
Expand Down Expand Up @@ -55,7 +54,7 @@ func (w *wrappedStore) Delete(ctx context.Context, key ds.Key) error {
return w.store.Delete(ctx, w.transform.ConvertKey(key))
}

func (w *wrappedStore) GetIterator(q query.Query) (iterable.Iterator, error) {
func (w *wrappedStore) GetIterator(q dsq.Query) (iterable.Iterator, error) {
iterator, err := w.store.GetIterator(
withPrefix(q, w.transform.ConvertKey(ds.NewKey(q.Prefix)).String()),
)
Expand All @@ -65,8 +64,8 @@ func (w *wrappedStore) GetIterator(q query.Query) (iterable.Iterator, error) {
return &wrappedIterator{transform: w.transform, iterator: iterator}, nil
}

func withPrefix(q query.Query, prefix string) query.Query {
return query.Query{
func withPrefix(q dsq.Query, prefix string) dsq.Query {
return dsq.Query{
Prefix: prefix,
Filters: q.Filters,
Orders: q.Orders,
Expand Down Expand Up @@ -108,10 +107,10 @@ func (w *wrappedStore) Query(ctx context.Context, q dsq.Query) (dsq.Results, err
return dsq.NaiveQueryApply(nq, qr), nil
}

// Split the query into a child query and a naive query. That way, we can make
// Split the query into a child query and a naive dsq. That way, we can make
// the child datastore do as much work as possible.
func (w *wrappedStore) prepareQuery(q dsq.Query) (naive, child dsq.Query) {
// First, put everything in the child query. Then, start taking things
// First, put everything in the child dsq. Then, start taking things
// out.
child = q

Expand Down
13 changes: 0 additions & 13 deletions db/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,19 +498,6 @@ func (c *collection) create(ctx context.Context, txn datastore.Txn, doc *client.
return err
}

// If this a Batch masked as a Transaction
// commit our writes so we can see them.
// Batches don't maintain serializability, or
// linearization, or any other transaction
// semantics, which the user already knows
// otherwise they wouldn't use a datastore
// that doesn't support proper transactions.
// So let's just commit, and keep going.
if txn.IsBatch() {
if err := txn.Commit(ctx); err != nil {
return err
}
}
return err
}

Expand Down
16 changes: 0 additions & 16 deletions db/collection_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,22 +427,6 @@ func (c *collection) applyMerge(
)
}

// If this a a Batch masked as a Transaction
// commit our writes so we can see them.
// Batches don't maintain serializability, or
// linearization, or any other transaction
// semantics, which the user already knows
// otherwise they wouldn't use a datastore
// that doesn't support proper transactions.
// So let's just commit, and keep going.
// @todo: Change this on the Txn.BatchShim
// structure
if txn.IsBatch() {
if err := txn.Commit(ctx); err != nil {
return err
}
}

return nil
}

Expand Down
8 changes: 4 additions & 4 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ var (
type db struct {
glock sync.RWMutex

rootstore ds.Batching
rootstore datastore.RootStore
multistore datastore.MultiStore

crdtFactory *crdt.Factory
Expand All @@ -80,11 +80,11 @@ func WithUpdateEvents() Option {
}

// NewDB creates a new instance of the DB using the given options.
func NewDB(ctx context.Context, rootstore ds.Batching, options ...Option) (client.DB, error) {
func NewDB(ctx context.Context, rootstore datastore.RootStore, options ...Option) (client.DB, error) {
return newDB(ctx, rootstore, options...)
}

func newDB(ctx context.Context, rootstore ds.Batching, options ...Option) (*db, error) {
func newDB(ctx context.Context, rootstore datastore.RootStore, options ...Option) (*db, error) {
log.Debug(ctx, "Loading: internal datastores")
root := datastore.AsDSReaderWriter(rootstore)
multistore := datastore.MultiStoreFrom(root)
Expand Down Expand Up @@ -125,7 +125,7 @@ func (db *db) NewTxn(ctx context.Context, readonly bool) (datastore.Txn, error)
return datastore.NewTxnFrom(ctx, db.rootstore, readonly)
}

func (db *db) Root() ds.Batching {
func (db *db) Root() datastore.RootStore {
return db.rootstore
}

Expand Down
7 changes: 5 additions & 2 deletions db/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"testing"

badger "github.com/dgraph-io/badger/v3"
ds "github.com/ipfs/go-datastore"
dag "github.com/ipfs/go-merkledag"
"github.com/stretchr/testify/assert"

Expand Down Expand Up @@ -52,7 +51,11 @@ func TestNewDB(t *testing.T) {

func TestNewDBWithCollection_Errors_GivenNoSchema(t *testing.T) {
ctx := context.Background()
rootstore := ds.NewMapDatastore()
opts := badgerds.Options{Options: badger.DefaultOptions("").WithInMemory(true)}
rootstore, err := badgerds.NewDatastore("", &opts)
if err != nil {
t.Error(err)
}

db, err := NewDB(ctx, rootstore)
if err != nil {
Expand Down
32 changes: 10 additions & 22 deletions db/fetcher/versioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"context"
"fmt"

badger "github.com/dgraph-io/badger/v3"
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
format "github.com/ipfs/go-ipld-format"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/core"
"github.com/sourcenetwork/defradb/datastore"
badgerds "github.com/sourcenetwork/defradb/datastore/badger/v3"
"github.com/sourcenetwork/defradb/db/base"
"github.com/sourcenetwork/defradb/errors"
"github.com/sourcenetwork/defradb/merkle/crdt"
Expand Down Expand Up @@ -84,7 +86,7 @@ type VersionedFetcher struct {
ctx context.Context

// Transient version store
root ds.Datastore
root datastore.RootStore
store datastore.Txn

key core.DataStoreKey
Expand Down Expand Up @@ -152,11 +154,15 @@ func (vf *VersionedFetcher) Start(ctx context.Context, txn datastore.Txn, spans
vf.version = c

// create store
root := ds.NewMapDatastore()
vf.root = root
opts := badgerds.Options{Options: badger.DefaultOptions("").WithInMemory(true)}
vf.root, err = badgerds.NewDatastore("", &opts)
if err != nil {
return err
}

vf.store, err = datastore.NewTxnFrom(
ctx,
root,
vf.root,
false,
) // were going to discard and nuke this later
if err != nil {
Expand Down Expand Up @@ -214,16 +220,6 @@ func (vf *VersionedFetcher) seekTo(c cid.Cid) error {
return err
}

// after seekNext is completed, we have a populated
// queuedCIDs list, and all the necessary
// blocks in our local store
// If we are using a batch store, then we need to commit
if vf.store.IsBatch() {
if err := vf.store.Commit(vf.ctx); err != nil {
return err
}
}

// if we have a queuedCIDs length of 0, means we don't need
// to do any more state serialization

Expand All @@ -248,13 +244,6 @@ func (vf *VersionedFetcher) seekTo(c cid.Cid) error {
}
}

// If we are using a batch store, then we need to commit
if vf.store.IsBatch() {
if err := vf.store.Commit(vf.ctx); err != nil {
return err
}
}

// we now have all the the required state stored
// in our transient local Version_Index, we now need to
// transfer it to the Primary_Index.
Expand Down Expand Up @@ -431,7 +420,6 @@ func (vf *VersionedFetcher) getDAGNode(c cid.Cid) (*dag.ProtoNode, error) {
}

func (vf *VersionedFetcher) Close() error {
vf.store.Discard(vf.ctx)
if err := vf.root.Close(); err != nil {
return err
}
Expand Down
2 changes: 0 additions & 2 deletions tests/bench/bench_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,6 @@ func newBenchStoreInfo(ctx context.Context, t testing.TB) (dbInfo, error) {
dbi, err = testutils.NewBadgerMemoryDB(ctx)
case "badger":
dbi, err = testutils.NewBadgerFileDB(ctx, t)
case "memorymap":
dbi, err = testutils.NewMapDB(ctx)
default:
return nil, errors.New(fmt.Sprintf("invalid storage engine backend: %s", storage))
}
Expand Down
Loading

0 comments on commit f60e17c

Please sign in to comment.