Skip to content

Commit

Permalink
Refactor EnumerateChildrenAsync to take in a function to get the links.
Browse files Browse the repository at this point in the history
For now it is always called with the helper function GetLinksDirect to
avoid any change in behaviour.

License: MIT
Signed-off-by: Kevin Atkinson <[email protected]>
  • Loading branch information
kevina committed Feb 21, 2017
1 parent 063b09a commit f778076
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 11 deletions.
2 changes: 1 addition & 1 deletion core/commands/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ func provideKeysRec(ctx context.Context, r routing.IpfsRouting, dserv dag.DAGSer
for _, c := range cids {
kset := cid.NewSet()

err := dag.EnumerateChildrenAsync(ctx, dserv, c, kset.Visit)
err := dag.EnumerateChildrenAsync(ctx, dag.GetLinksDirect(dserv), c, kset.Visit)
if err != nil {
return err
}
Expand Down
31 changes: 22 additions & 9 deletions merkledag/merkledag.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,21 @@ func (n *dagService) Remove(nd node.Node) error {
return n.Blocks.DeleteBlock(nd)
}

// get the links for a node, from the node, bypassing the
// LinkService
func GetLinksDirect(serv DAGService) GetLinks {
return func(ctx context.Context, c *cid.Cid) ([]*node.Link, error) {
node, err := serv.Get(ctx, c)
if err != nil {
return nil, err
}
return node.Links(), nil
}
}

// FetchGraph fetches all nodes that are children of the given node
func FetchGraph(ctx context.Context, c *cid.Cid, serv DAGService) error {
return EnumerateChildrenAsync(ctx, serv, c, cid.NewSet().Visit)
func FetchGraph(ctx context.Context, root *cid.Cid, serv DAGService) error {
return EnumerateChildrenAsync(ctx, GetLinksDirect(serv), root, cid.NewSet().Visit)
}

// FindLinks searches this nodes links for the given key,
Expand Down Expand Up @@ -367,10 +379,11 @@ func (t *Batch) Commit() error {
return err
}

type GetLinks func(context.Context, *cid.Cid) ([]*node.Link, error)

// EnumerateChildren will walk the dag below the given root node and add all
// unseen children to the passed in set.
// TODO: parallelize to avoid disk latency perf hits?
type GetLinks func(context.Context, *cid.Cid) ([]*node.Link, error)
func EnumerateChildren(ctx context.Context, getLinks GetLinks, root *cid.Cid, visit func(*cid.Cid) bool) error {
links, err := getLinks(ctx, root)
if err != nil {
Expand All @@ -392,9 +405,9 @@ func EnumerateChildren(ctx context.Context, getLinks GetLinks, root *cid.Cid, vi
// 'fetchNodes' will start at a time
var FetchGraphConcurrency = 8

func EnumerateChildrenAsync(ctx context.Context, ds DAGService, c *cid.Cid, visit func(*cid.Cid) bool) error {
func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c *cid.Cid, visit func(*cid.Cid) bool) error {
feed := make(chan *cid.Cid)
out := make(chan node.Node)
out := make(chan []*node.Link)
done := make(chan struct{})

var setlk sync.Mutex
Expand All @@ -407,7 +420,7 @@ func EnumerateChildrenAsync(ctx context.Context, ds DAGService, c *cid.Cid, visi
for i := 0; i < FetchGraphConcurrency; i++ {
go func() {
for ic := range feed {
n, err := ds.Get(ctx, ic)
links, err := getLinks(ctx, ic)
if err != nil {
errChan <- err
return
Expand All @@ -419,7 +432,7 @@ func EnumerateChildrenAsync(ctx context.Context, ds DAGService, c *cid.Cid, visi

if unseen {
select {
case out <- n:
case out <- links:
case <-fetchersCtx.Done():
return
}
Expand Down Expand Up @@ -454,8 +467,8 @@ func EnumerateChildrenAsync(ctx context.Context, ds DAGService, c *cid.Cid, visi
if inProgress == 0 && next == nil {
return nil
}
case nd := <-out:
for _, lnk := range nd.Links() {
case links := <-out:
for _, lnk := range links {
if next == nil {
next = lnk.Cid
send = feed
Expand Down
2 changes: 1 addition & 1 deletion merkledag/merkledag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ func TestEnumerateAsyncFailsNotFound(t *testing.T) {
}

cset := cid.NewSet()
err = EnumerateChildrenAsync(context.Background(), ds, pcid, cset.Visit)
err = EnumerateChildrenAsync(context.Background(), GetLinksDirect(ds), pcid, cset.Visit)
if err == nil {
t.Fatal("this should have failed")
}
Expand Down

0 comments on commit f778076

Please sign in to comment.