Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Asynchronous Datastores #6785

Merged
merged 1 commit into from
Dec 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion core/coreapi/unixfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
blockservice "github.com/ipfs/go-blockservice"
cid "github.com/ipfs/go-cid"
cidutil "github.com/ipfs/go-cidutil"
filestore "github.com/ipfs/go-filestore"
bstore "github.com/ipfs/go-ipfs-blockstore"
files "github.com/ipfs/go-ipfs-files"
ipld "github.com/ipfs/go-ipld-format"
Expand Down Expand Up @@ -96,7 +97,29 @@ func (api *UnixfsAPI) Add(ctx context.Context, files files.Node, opts ...options
bserv := blockservice.New(addblockstore, exch) // hash security 001
dserv := dag.NewDAGService(bserv)

fileAdder, err := coreunix.NewAdder(ctx, pinning, addblockstore, dserv)
// add a sync call to the DagService
// this ensures that data written to the DagService is persisted to the underlying datastore
// TODO: propagate the Sync function from the datastore through the blockstore, blockservice and dagservice
var syncDserv *syncDagService
if settings.OnlyHash {
syncDserv = &syncDagService{
DAGService: dserv,
syncFn: func() error { return nil },
}
} else {
syncDserv = &syncDagService{
DAGService: dserv,
syncFn: func() error {
ds := api.repo.Datastore()
if err := ds.Sync(bstore.BlockPrefix); err != nil {
return err
}
return ds.Sync(filestore.FilestorePrefix)
},
}
}

fileAdder, err := coreunix.NewAdder(ctx, pinning, addblockstore, syncDserv)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -272,3 +295,13 @@ func (api *UnixfsAPI) lsFromLinks(ctx context.Context, ndlinks []*ipld.Link, set
func (api *UnixfsAPI) core() *CoreAPI {
return (*CoreAPI)(api)
}

// syncDagService is used by the Adder to ensure blocks get persisted to the underlying datastore
type syncDagService struct {
ipld.DAGService
syncFn func() error
}

func (s *syncDagService) Sync() error {
return s.syncFn()
}
11 changes: 11 additions & 0 deletions core/coreunix/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ type Link struct {
Size uint64
}

type syncer interface {
Sync() error
}

// NewAdder Returns a new Adder used for a file add operation.
func NewAdder(ctx context.Context, p pin.Pinner, bs bstore.GCLocker, ds ipld.DAGService) (*Adder, error) {
bufferedDS := ipld.NewBufferedDAG(ctx, ds)
Expand Down Expand Up @@ -316,6 +320,13 @@ func (adder *Adder) AddAllAndPin(file files.Node) (ipld.Node, error) {
return nil, err
}

if asyncDagService, ok := adder.dagService.(syncer); ok {
err = asyncDagService.Sync()
if err != nil {
return nil, err
}
}

Stebalien marked this conversation as resolved.
Show resolved Hide resolved
if !adder.Pin {
return nd, nil
}
Expand Down
39 changes: 36 additions & 3 deletions core/node/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-filestore"
"github.com/ipfs/go-ipfs-blockstore"
"github.com/ipfs/go-ipfs-exchange-interface"
"github.com/ipfs/go-ipfs-exchange-offline"
Expand Down Expand Up @@ -41,18 +42,39 @@ func BlockService(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interf
// Pinning creates new pinner which tells GC which blocks should be kept
func Pinning(bstore blockstore.Blockstore, ds format.DAGService, repo repo.Repo) (pin.Pinner, error) {
internalDag := merkledag.NewDAGService(blockservice.New(bstore, offline.Exchange(bstore)))
pinning, err := pin.LoadPinner(repo.Datastore(), ds, internalDag)
rootDS := repo.Datastore()

syncFn := func() error {
if err := rootDS.Sync(blockstore.BlockPrefix); err != nil {
return err
}
return rootDS.Sync(filestore.FilestorePrefix)
}
syncDs := &syncDagService{ds, syncFn}
syncInternalDag := &syncDagService{internalDag, syncFn}

pinning, err := pin.LoadPinner(rootDS, syncDs, syncInternalDag)
if err != nil {
// TODO: we should move towards only running 'NewPinner' explicitly on
// node init instead of implicitly here as a result of the pinner keys
// not being found in the datastore.
// this is kinda sketchy and could cause data loss
pinning = pin.NewPinner(repo.Datastore(), ds, internalDag)
pinning = pin.NewPinner(rootDS, syncDs, syncInternalDag)
}

return pinning, nil
}

// syncDagService is used by the Pinner to ensure data gets persisted to the underlying datastore
type syncDagService struct {
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
format.DAGService
syncFn func() error
}

func (s *syncDagService) Sync() error {
return s.syncFn()
}

// Dag creates new DAGService
func Dag(bs blockservice.BlockService) format.DAGService {
return merkledag.NewDAGService(bs)
Expand All @@ -77,7 +99,18 @@ func OnlineExchange(provide bool) interface{} {
func Files(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo, dag format.DAGService) (*mfs.Root, error) {
dsk := datastore.NewKey("/local/filesroot")
pf := func(ctx context.Context, c cid.Cid) error {
return repo.Datastore().Put(dsk, c.Bytes())
rootDS := repo.Datastore()
if err := rootDS.Sync(blockstore.BlockPrefix); err != nil {
return err
}
if err := rootDS.Sync(filestore.FilestorePrefix); err != nil {
return err
}

if err := rootDS.Put(dsk, c.Bytes()); err != nil {
return err
}
return rootDS.Sync(dsk)
}

var nd *merkledag.ProtoNode
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ require (
github.com/ipfs/go-ipfs-exchange-interface v0.0.1
github.com/ipfs/go-ipfs-exchange-offline v0.0.1
github.com/ipfs/go-ipfs-files v0.0.4
github.com/ipfs/go-ipfs-pinner v0.0.2
github.com/ipfs/go-ipfs-pinner v0.0.3
github.com/ipfs/go-ipfs-posinfo v0.0.1
github.com/ipfs/go-ipfs-provider v0.3.0
github.com/ipfs/go-ipfs-routing v0.1.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,8 @@ github.com/ipfs/go-ipfs-files v0.0.3/go.mod h1:INEFm0LL2LWXBhNJ2PMIIb2w45hpXgPjN
github.com/ipfs/go-ipfs-files v0.0.4 h1:WzRCivcybUQch/Qh6v8LBRhKtRsjnwyiuOV09mK7mrE=
github.com/ipfs/go-ipfs-files v0.0.4/go.mod h1:INEFm0LL2LWXBhNJ2PMIIb2w45hpXgPjNoE7yA8Y1d4=
github.com/ipfs/go-ipfs-flags v0.0.1/go.mod h1:RnXBb9WV53GSfTrSDVK61NLTFKvWc60n+K9EgCDh+rA=
github.com/ipfs/go-ipfs-pinner v0.0.2 h1:KRXt2V0TzoTd3mO1aONSw8C9wnZtl7RLpPruN/XDnlQ=
github.com/ipfs/go-ipfs-pinner v0.0.2/go.mod h1:KZGyGAR+yLthGEkG9tuA2zweB7O6auXaJNjX6IbEbOs=
github.com/ipfs/go-ipfs-pinner v0.0.3 h1:ez/yNYYyH1W7DiCF/L29tmp6L7lBO8eqbJtPi2pHicA=
github.com/ipfs/go-ipfs-pinner v0.0.3/go.mod h1:s4kFZWLWGDudN8Jyd/GTpt222A12C2snA2+OTdy/7p8=
github.com/ipfs/go-ipfs-posinfo v0.0.1 h1:Esoxj+1JgSjX0+ylc0hUmJCOv6V2vFoZiETLR6OtpRs=
github.com/ipfs/go-ipfs-posinfo v0.0.1/go.mod h1:SwyeVP+jCwiDu0C313l/8jg6ZxM0qqtlt2a0vILTc1A=
github.com/ipfs/go-ipfs-pq v0.0.1 h1:zgUotX8dcAB/w/HidJh1zzc1yFq6Vm8J7T2F4itj/RU=
Expand Down
6 changes: 5 additions & 1 deletion namesys/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,11 @@ func (p *IpnsPublisher) updateRecord(ctx context.Context, k ci.PrivKey, value pa
}

// Put the new record.
if err := p.ds.Put(IpnsDsKey(id), data); err != nil {
key := IpnsDsKey(id)
if err := p.ds.Put(key, data); err != nil {
return nil, err
}
if err := p.ds.Sync(key); err != nil {
return nil, err
}
return entry, nil
Expand Down
43 changes: 43 additions & 0 deletions namesys/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package namesys
import (
"context"
"crypto/rand"
"github.com/ipfs/go-path"
"testing"
"time"

Expand Down Expand Up @@ -110,3 +111,45 @@ func TestRSAPublisher(t *testing.T) {
func TestEd22519Publisher(t *testing.T) {
testNamekeyPublisher(t, ci.Ed25519, ds.ErrNotFound, false)
}

func TestAsyncDS(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

rt := mockrouting.NewServer().Client(testutil.RandIdentityOrFatal(t))
ds := &checkSyncDS{
Datastore: ds.NewMapDatastore(),
syncKeys: make(map[ds.Key]struct{}),
}
publisher := NewIpnsPublisher(rt, ds)

ipnsFakeID := testutil.RandIdentityOrFatal(t)
ipnsVal, err := path.ParsePath("/ipns/foo.bar")
if err != nil {
t.Fatal(err)
}

if err := publisher.Publish(ctx, ipnsFakeID.PrivateKey(), ipnsVal); err != nil {
t.Fatal(err)
}

ipnsKey := IpnsDsKey(ipnsFakeID.ID())

for k := range ds.syncKeys {
if k.IsAncestorOf(ipnsKey) || k.Equal(ipnsKey) {
return
}
}

t.Fatal("ipns key not synced")
}

type checkSyncDS struct {
ds.Datastore
syncKeys map[ds.Key]struct{}
}

func (d *checkSyncDS) Sync(prefix ds.Key) error {
d.syncKeys[prefix] = struct{}{}
return d.Datastore.Sync(prefix)
}