Skip to content

Commit

Permalink
support async datastores
Browse files Browse the repository at this point in the history
  • Loading branch information
aschmahmann committed Dec 16, 2019
1 parent b7f03a5 commit 7e75734
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 8 deletions.
34 changes: 33 additions & 1 deletion core/coreapi/unixfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
blockservice "github.com/ipfs/go-blockservice"
cid "github.com/ipfs/go-cid"
cidutil "github.com/ipfs/go-cidutil"
filestore "github.com/ipfs/go-filestore"
bstore "github.com/ipfs/go-ipfs-blockstore"
files "github.com/ipfs/go-ipfs-files"
ipld "github.com/ipfs/go-ipld-format"
Expand Down Expand Up @@ -96,7 +97,29 @@ func (api *UnixfsAPI) Add(ctx context.Context, files files.Node, opts ...options
bserv := blockservice.New(addblockstore, exch) // hash security 001
dserv := dag.NewDAGService(bserv)

fileAdder, err := coreunix.NewAdder(ctx, pinning, addblockstore, dserv)
// add a sync call to the DagService
// this ensures that data written to the DagService is persisted to the underlying datastore
// TODO: propagate the Sync function from the datastore through the blockstore, blockservice and dagservice
var syncDserv *syncDagService
if settings.OnlyHash {
syncDserv = &syncDagService{
DAGService: dserv,
syncFn: func(c cid.Cid) error { return nil },
}
} else {
syncDserv = &syncDagService{
DAGService: dserv,
syncFn: func(c cid.Cid) error {
ds := api.repo.Datastore()
if err := ds.Sync(bstore.BlockPrefix.ChildString(c.String())); err != nil {
return err
}
return ds.Sync(filestore.FilestorePrefix)
},
}
}

fileAdder, err := coreunix.NewAdder(ctx, pinning, addblockstore, syncDserv)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -272,3 +295,12 @@ func (api *UnixfsAPI) lsFromLinks(ctx context.Context, ndlinks []*ipld.Link, set
func (api *UnixfsAPI) core() *CoreAPI {
return (*CoreAPI)(api)
}

type syncDagService struct {
ipld.DAGService
syncFn func(c cid.Cid) error
}

func (s *syncDagService) Sync(c cid.Cid) error {
return s.syncFn(c)
}
11 changes: 11 additions & 0 deletions core/coreunix/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ type Link struct {
Size uint64
}

type syncer interface {
Sync(c cid.Cid) error
}

// NewAdder Returns a new Adder used for a file add operation.
func NewAdder(ctx context.Context, p pin.Pinner, bs bstore.GCLocker, ds ipld.DAGService) (*Adder, error) {
bufferedDS := ipld.NewBufferedDAG(ctx, ds)
Expand Down Expand Up @@ -316,6 +320,13 @@ func (adder *Adder) AddAllAndPin(file files.Node) (ipld.Node, error) {
return nil, err
}

if asyncDagService, ok := adder.dagService.(syncer); ok {
err = asyncDagService.Sync(nd.Cid())
if err != nil {
return nil, err
}
}

if !adder.Pin {
return nd, nil
}
Expand Down
33 changes: 30 additions & 3 deletions core/node/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-filestore"
"github.com/ipfs/go-ipfs-blockstore"
"github.com/ipfs/go-ipfs-exchange-interface"
"github.com/ipfs/go-ipfs-exchange-offline"
Expand Down Expand Up @@ -41,18 +42,33 @@ func BlockService(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interf
// Pinning creates new pinner which tells GC which blocks should be kept
func Pinning(bstore blockstore.Blockstore, ds format.DAGService, repo repo.Repo) (pin.Pinner, error) {
internalDag := merkledag.NewDAGService(blockservice.New(bstore, offline.Exchange(bstore)))
pinning, err := pin.LoadPinner(repo.Datastore(), ds, internalDag)
rootDS := repo.Datastore()

syncFn := func() error { return rootDS.Sync(blockstore.BlockPrefix) }
syncDs := &syncDagService{ds, syncFn}
syncInternalDag := &syncDagService{internalDag, syncFn}

pinning, err := pin.LoadPinner(rootDS, syncDs, syncInternalDag)
if err != nil {
// TODO: we should move towards only running 'NewPinner' explicitly on
// node init instead of implicitly here as a result of the pinner keys
// not being found in the datastore.
// this is kinda sketchy and could cause data loss
pinning = pin.NewPinner(repo.Datastore(), ds, internalDag)
pinning = pin.NewPinner(rootDS, syncDs, syncInternalDag)
}

return pinning, nil
}

type syncDagService struct {
format.DAGService
syncFn func() error
}

func (s *syncDagService) Sync() error {
return s.syncFn()
}

// Dag creates new DAGService
func Dag(bs blockservice.BlockService) format.DAGService {
return merkledag.NewDAGService(bs)
Expand All @@ -77,7 +93,18 @@ func OnlineExchange(provide bool) interface{} {
func Files(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo, dag format.DAGService) (*mfs.Root, error) {
dsk := datastore.NewKey("/local/filesroot")
pf := func(ctx context.Context, c cid.Cid) error {
return repo.Datastore().Put(dsk, c.Bytes())
rootDS := repo.Datastore()
if err := rootDS.Sync(blockstore.BlockPrefix.ChildString(c.String())); err != nil {
return err
}
if err := rootDS.Sync(filestore.FilestorePrefix); err != nil {
return err
}

if err := rootDS.Put(dsk, c.Bytes()); err != nil {
return err
}
return rootDS.Sync(dsk)
}

var nd *merkledag.ProtoNode
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ require (
github.com/ipfs/go-ipfs-exchange-interface v0.0.1
github.com/ipfs/go-ipfs-exchange-offline v0.0.1
github.com/ipfs/go-ipfs-files v0.0.4
github.com/ipfs/go-ipfs-pinner v0.0.2
github.com/ipfs/go-ipfs-pinner v0.0.3
github.com/ipfs/go-ipfs-posinfo v0.0.1
github.com/ipfs/go-ipfs-provider v0.3.0
github.com/ipfs/go-ipfs-routing v0.1.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,8 @@ github.com/ipfs/go-ipfs-files v0.0.3/go.mod h1:INEFm0LL2LWXBhNJ2PMIIb2w45hpXgPjN
github.com/ipfs/go-ipfs-files v0.0.4 h1:WzRCivcybUQch/Qh6v8LBRhKtRsjnwyiuOV09mK7mrE=
github.com/ipfs/go-ipfs-files v0.0.4/go.mod h1:INEFm0LL2LWXBhNJ2PMIIb2w45hpXgPjNoE7yA8Y1d4=
github.com/ipfs/go-ipfs-flags v0.0.1/go.mod h1:RnXBb9WV53GSfTrSDVK61NLTFKvWc60n+K9EgCDh+rA=
github.com/ipfs/go-ipfs-pinner v0.0.2 h1:KRXt2V0TzoTd3mO1aONSw8C9wnZtl7RLpPruN/XDnlQ=
github.com/ipfs/go-ipfs-pinner v0.0.2/go.mod h1:KZGyGAR+yLthGEkG9tuA2zweB7O6auXaJNjX6IbEbOs=
github.com/ipfs/go-ipfs-pinner v0.0.3 h1:ez/yNYYyH1W7DiCF/L29tmp6L7lBO8eqbJtPi2pHicA=
github.com/ipfs/go-ipfs-pinner v0.0.3/go.mod h1:s4kFZWLWGDudN8Jyd/GTpt222A12C2snA2+OTdy/7p8=
github.com/ipfs/go-ipfs-posinfo v0.0.1 h1:Esoxj+1JgSjX0+ylc0hUmJCOv6V2vFoZiETLR6OtpRs=
github.com/ipfs/go-ipfs-posinfo v0.0.1/go.mod h1:SwyeVP+jCwiDu0C313l/8jg6ZxM0qqtlt2a0vILTc1A=
github.com/ipfs/go-ipfs-pq v0.0.1 h1:zgUotX8dcAB/w/HidJh1zzc1yFq6Vm8J7T2F4itj/RU=
Expand Down
6 changes: 5 additions & 1 deletion namesys/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,11 @@ func (p *IpnsPublisher) updateRecord(ctx context.Context, k ci.PrivKey, value pa
}

// Put the new record.
if err := p.ds.Put(IpnsDsKey(id), data); err != nil {
key := IpnsDsKey(id)
if err := p.ds.Put(key, data); err != nil {
return nil, err
}
if err := p.ds.Sync(key); err != nil {
return nil, err
}
return entry, nil
Expand Down
43 changes: 43 additions & 0 deletions namesys/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package namesys
import (
"context"
"crypto/rand"
"github.com/ipfs/go-path"
"testing"
"time"

Expand Down Expand Up @@ -110,3 +111,45 @@ func TestRSAPublisher(t *testing.T) {
func TestEd22519Publisher(t *testing.T) {
testNamekeyPublisher(t, ci.Ed25519, ds.ErrNotFound, false)
}

func TestAsyncDS(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

rt := mockrouting.NewServer().Client(testutil.RandIdentityOrFatal(t))
ds := &checkSyncDS{
Datastore: ds.NewMapDatastore(),
syncKeys: make(map[ds.Key]struct{}),
}
publisher := NewIpnsPublisher(rt, ds)

ipnsFakeID := testutil.RandIdentityOrFatal(t)
ipnsVal, err := path.ParsePath("/ipns/foo.bar")
if err != nil {
t.Fatal(err)
}

if err := publisher.Publish(ctx, ipnsFakeID.PrivateKey(), ipnsVal); err != nil {
t.Fatal(err)
}

ipnsKey := IpnsDsKey(ipnsFakeID.ID())

for k := range ds.syncKeys {
if k.IsAncestorOf(ipnsKey) || k.Equal(ipnsKey) {
return
}
}

t.Fatal("ipns key not synced")
}

type checkSyncDS struct {
ds.Datastore
syncKeys map[ds.Key]struct{}
}

func (d *checkSyncDS) Sync(prefix ds.Key) error {
d.syncKeys[prefix] = struct{}{}
return d.Datastore.Sync(prefix)
}

0 comments on commit 7e75734

Please sign in to comment.