Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose selector traversal options for SelectiveCar #251

Merged
merged 6 commits into from
Oct 1, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 7 additions & 106 deletions car_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package car
package car_test

import (
"bytes"
Expand All @@ -12,10 +12,7 @@ import (
format "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-merkledag"
dstest "github.com/ipfs/go-merkledag/test"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
"github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
"github.com/stretchr/testify/require"
car "github.com/ipld/go-car"
)

func assertAddNodes(t *testing.T, ds format.DAGService, nds ...format.Node) {
Expand Down Expand Up @@ -46,12 +43,12 @@ func TestRoundtrip(t *testing.T) {
assertAddNodes(t, dserv, a, b, c, nd1, nd2, nd3)

buf := new(bytes.Buffer)
if err := WriteCar(context.Background(), dserv, []cid.Cid{nd3.Cid()}, buf); err != nil {
if err := car.WriteCar(context.Background(), dserv, []cid.Cid{nd3.Cid()}, buf); err != nil {
t.Fatal(err)
}

bserv := dstest.Bserv()
ch, err := LoadCar(bserv.Blockstore(), buf)
ch, err := car.LoadCar(bserv.Blockstore(), buf)
if err != nil {
t.Fatal(err)
}
Expand All @@ -77,111 +74,15 @@ func TestRoundtrip(t *testing.T) {
}
}

func TestRoundtripSelective(t *testing.T) {
sourceBserv := dstest.Bserv()
sourceBs := sourceBserv.Blockstore()
dserv := merkledag.NewDAGService(sourceBserv)
a := merkledag.NewRawNode([]byte("aaaa"))
b := merkledag.NewRawNode([]byte("bbbb"))
c := merkledag.NewRawNode([]byte("cccc"))

nd1 := &merkledag.ProtoNode{}
nd1.AddNodeLink("cat", a)

nd2 := &merkledag.ProtoNode{}
nd2.AddNodeLink("first", nd1)
nd2.AddNodeLink("dog", b)
nd2.AddNodeLink("repeat", nd1)

nd3 := &merkledag.ProtoNode{}
nd3.AddNodeLink("second", nd2)
nd3.AddNodeLink("bear", c)

assertAddNodes(t, dserv, a, b, c, nd1, nd2, nd3)

ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)

// the graph assembled above looks as follows, in order:
// nd3 -> [c, nd2 -> [nd1 -> a, b, nd1 -> a]]
// this selector starts at n3, and traverses a link at index 1 (nd2, the second link, zero indexed)
// it then recursively traverses all of its children
// the only node skipped is 'c' -- link at index 0 immediately below nd3
// the purpose is simply to show we are not writing the entire merkledag underneath
// nd3
selector := ssb.ExploreFields(func(efsb builder.ExploreFieldsSpecBuilder) {
efsb.Insert("Links",
ssb.ExploreIndex(1, ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreAll(ssb.ExploreRecursiveEdge()))))
}).Node()

sc := NewSelectiveCar(context.Background(), sourceBs, []Dag{{Root: nd3.Cid(), Selector: selector}})

// write car in one step
buf := new(bytes.Buffer)
blockCount := 0
var oneStepBlocks []Block
err := sc.Write(buf, func(block Block) error {
oneStepBlocks = append(oneStepBlocks, block)
blockCount++
return nil
})
require.Equal(t, blockCount, 5)
require.NoError(t, err)

// create a new builder for two-step write
sc2 := NewSelectiveCar(context.Background(), sourceBs, []Dag{{Root: nd3.Cid(), Selector: selector}})

// write car in two steps
var twoStepBlocks []Block
scp, err := sc2.Prepare(func(block Block) error {
twoStepBlocks = append(twoStepBlocks, block)
return nil
})
require.NoError(t, err)
buf2 := new(bytes.Buffer)
err = scp.Dump(buf2)
require.NoError(t, err)

// verify preparation step correctly assesed length and blocks
require.Equal(t, scp.Size(), uint64(buf.Len()))
require.Equal(t, len(scp.Cids()), blockCount)

// verify equal data written by both methods
require.Equal(t, buf.Bytes(), buf2.Bytes())

// verify equal blocks were passed to user block hook funcs
require.Equal(t, oneStepBlocks, twoStepBlocks)

// readout car and verify contents
bserv := dstest.Bserv()
ch, err := LoadCar(bserv.Blockstore(), buf)
require.NoError(t, err)
require.Equal(t, len(ch.Roots), 1)

require.True(t, ch.Roots[0].Equals(nd3.Cid()))

bs := bserv.Blockstore()
for _, nd := range []format.Node{a, b, nd1, nd2, nd3} {
has, err := bs.Has(nd.Cid())
require.NoError(t, err)
require.True(t, has)
}

for _, nd := range []format.Node{c} {
has, err := bs.Has(nd.Cid())
require.NoError(t, err)
require.False(t, has)
}
}

func TestEOFHandling(t *testing.T) {
// fixture is a clean single-block, single-root CAR
fixture, err := hex.DecodeString("3aa265726f6f747381d82a58250001711220151fe9e73c6267a7060c6f6c4cca943c236f4b196723489608edb42a8b8fa80b6776657273696f6e012c01711220151fe9e73c6267a7060c6f6c4cca943c236f4b196723489608edb42a8b8fa80ba165646f646779f5")
if err != nil {
t.Fatal(err)
}

load := func(t *testing.T, byts []byte) *CarReader {
cr, err := NewCarReader(bytes.NewReader(byts))
load := func(t *testing.T, byts []byte) *car.CarReader {
cr, err := car.NewCarReader(bytes.NewReader(byts))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -294,7 +195,7 @@ func TestBadHeaders(t *testing.T) {
if err != nil {
t.Fatal(err)
}
_, err = NewCarReader(bytes.NewReader(fixture))
_, err = car.NewCarReader(bytes.NewReader(fixture))
return err
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/ipfs/go-ipld-format v0.2.0
github.com/ipfs/go-merkledag v0.3.2
github.com/ipld/go-codec-dagpb v1.2.0
github.com/ipld/go-ipld-prime v0.9.0
github.com/ipld/go-ipld-prime v0.12.3-0.20210930132912-0b3aef3ca569
github.com/multiformats/go-multihash v0.0.15
github.com/stretchr/testify v1.7.0
)
Expand Down
7 changes: 6 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,9 @@ github.com/ipfs/go-verifcid v0.0.1 h1:m2HI7zIuR5TFyQ1b79Da5N9dnnCP1vcu2QqawmWlK2
github.com/ipfs/go-verifcid v0.0.1/go.mod h1:5Hrva5KBeIog4A+UpqlaIU+DEstipcJYQQZc0g37pY0=
github.com/ipld/go-codec-dagpb v1.2.0 h1:2umV7ud8HBMkRuJgd8gXw95cLhwmcYrihS3cQEy9zpI=
github.com/ipld/go-codec-dagpb v1.2.0/go.mod h1:6nBN7X7h8EOsEejZGqC7tej5drsdBAXbMHyBT+Fne5s=
github.com/ipld/go-ipld-prime v0.9.0 h1:N2OjJMb+fhyFPwPnVvJcWU/NsumP8etal+d2v3G4eww=
github.com/ipld/go-ipld-prime v0.9.0/go.mod h1:KvBLMr4PX1gWptgkzRjVZCrLmSGcZCb/jioOQwCqZN8=
github.com/ipld/go-ipld-prime v0.12.3-0.20210930132912-0b3aef3ca569 h1:UDHkozLpTefhQzyu/2BWVRvsFHjhzvL387KsfFqE1vc=
github.com/ipld/go-ipld-prime v0.12.3-0.20210930132912-0b3aef3ca569/go.mod h1:PaeLYq8k6dJLmDUSLrzkEpoGV4PEfe/1OtFN/eALOc8=
github.com/jackpal/gateway v1.0.5 h1:qzXWUJfuMdlLMtt0a3Dgt+xkWQiA5itDEITVJtuSwMc=
github.com/jackpal/gateway v1.0.5/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA=
github.com/jackpal/go-nat-pmp v1.0.1 h1:i0LektDkO1QlrTm/cSuP+PyBCDnYvjPLGl4LdWEMiaA=
Expand Down Expand Up @@ -237,6 +238,8 @@ github.com/multiformats/go-multiaddr-net v0.0.1/go.mod h1:nw6HSxNmCIQH27XPGBuX+d
github.com/multiformats/go-multibase v0.0.1/go.mod h1:bja2MqRZ3ggyXtZSEDKpl0uO/gviWFaSteVbWT51qgs=
github.com/multiformats/go-multibase v0.0.3 h1:l/B6bJDQjvQ5G52jw4QGSYeOTZoAwIO77RblWplfIqk=
github.com/multiformats/go-multibase v0.0.3/go.mod h1:5+1R4eQrT3PkYZ24C3W2Ue2tPwIdYQD509ZjSb5y9Oc=
github.com/multiformats/go-multicodec v0.3.0 h1:tstDwfIjiHbnIjeM5Lp+pMrSeN+LCMsEwOrkPmWm03A=
github.com/multiformats/go-multicodec v0.3.0/go.mod h1:qGGaQmioCDh+TeFOnxrbU0DaIPw8yFgAZgFG0V7p1qQ=
github.com/multiformats/go-multihash v0.0.1/go.mod h1:w/5tugSrLEbWqlcgJabL3oHFKTwfvkofsjW2Qa1ct4U=
github.com/multiformats/go-multihash v0.0.5/go.mod h1:lt/HCbqlQwlPBz7lv0sQCdtfcMtlJvakRUn/0Ual8po=
github.com/multiformats/go-multihash v0.0.10/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew=
Expand Down Expand Up @@ -279,6 +282,8 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ=
github.com/warpfork/go-testmark v0.3.0 h1:Q81c4u7hT+BR5kNfNQhEF0VT2pmL7+Kk0wD+ORYl7iA=
github.com/warpfork/go-testmark v0.3.0/go.mod h1:jhEf8FVxd+F17juRubpmut64NEG6I2rgkUhlcqqXwE0=
github.com/warpfork/go-wish v0.0.0-20180510122957-5ad1f5abf436/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw=
github.com/warpfork/go-wish v0.0.0-20200122115046-b9ea61034e4a h1:G++j5e0OC488te356JvdhaM8YS6nMsjLAYF7JxCv07w=
github.com/warpfork/go-wish v0.0.0-20200122115046-b9ea61034e4a/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw=
Expand Down
56 changes: 56 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package car

import "math"

// Options holds the configured options after applying a number of
rvagg marked this conversation as resolved.
Show resolved Hide resolved
// Option funcs.
//
// This type should not be used directly by end users; it's only exposed as a
// side effect of Option.
type Options struct {
TraverseLinksOnlyOnce bool
MaxTraversalLinks uint64
}

// Option describes an option which affects behavior when
// interacting with the interface.
type Option func(*Options)

// TraverseLinksOnlyOnce prevents the traversal engine from repeatedly visiting
// the same links more than once.
//
// This can be an efficient strategy for an exhaustive selector where it's known
// that repeat visits won't impact the completeness of execution. However it
// should be used with caution with most other selectors as repeat visits of
// links for different reasons during selector execution can be valid and
// necessary to perform full traversal.
func TraverseLinksOnlyOnce() Option {
return func(sco *Options) {
sco.TraverseLinksOnlyOnce = true
}
}

// MaxTraversalLinks changes the allowed number of links a selector traversal
// can execute before failing.
//
// Note that setting this option may cause an error to be returned from selector
// execution when building a SelectiveCar.
func MaxTraversalLinks(MaxTraversalLinks uint64) Option {
return func(sco *Options) {
sco.MaxTraversalLinks = MaxTraversalLinks
}
}

// ApplyOptions applies given opts and returns the resulting Options.
rvagg marked this conversation as resolved.
Show resolved Hide resolved
// This function should not be used directly by end users; it's only exposed as a
// side effect of Option.
rvagg marked this conversation as resolved.
Show resolved Hide resolved
func ApplyOptions(opt ...Option) Options {
opts := Options{
TraverseLinksOnlyOnce: false, // default: recurse until exhausted
masih marked this conversation as resolved.
Show resolved Hide resolved
MaxTraversalLinks: math.MaxInt64, // default: traverse all
}
for _, o := range opt {
o(&opts)
}
return opts
}
28 changes: 28 additions & 0 deletions options_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package car_test

import (
"math"
"testing"

car "github.com/ipld/go-car"
"github.com/stretchr/testify/require"
)

func TestApplyOptions_SetsExpectedDefaults(t *testing.T) {
require.Equal(t, car.Options{
MaxTraversalLinks: math.MaxInt64,
TraverseLinksOnlyOnce: false,
}, car.ApplyOptions())
}

func TestApplyOptions_AppliesOptions(t *testing.T) {
require.Equal(t,
car.Options{
MaxTraversalLinks: 123,
TraverseLinksOnlyOnce: true,
},
car.ApplyOptions(
car.MaxTraversalLinks(123),
car.TraverseLinksOnlyOnce(),
))
}
18 changes: 14 additions & 4 deletions selectivecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"io"
"math"

cid "github.com/ipfs/go-cid"
util "github.com/ipld/go-car/util"
Expand Down Expand Up @@ -40,6 +41,7 @@ type SelectiveCar struct {
ctx context.Context
dags []Dag
store ReadStore
opts Options
}

// OnCarHeaderFunc is called during traversal when the header is created
Expand All @@ -61,16 +63,16 @@ type SelectiveCarPrepared struct {

// NewSelectiveCar creates a new SelectiveCar for the given car file based
// a block store and set of root+selector pairs
func NewSelectiveCar(ctx context.Context, store ReadStore, dags []Dag) SelectiveCar {
func NewSelectiveCar(ctx context.Context, store ReadStore, dags []Dag, opts ...Option) SelectiveCar {
return SelectiveCar{
ctx: ctx,
store: store,
dags: dags,
opts: ApplyOptions(opts...),
}
}

func (sc SelectiveCar) traverse(onCarHeader OnCarHeaderFunc, onNewCarBlock OnNewCarBlockFunc) (uint64, error) {

traverser := &selectiveCarTraverser{onCarHeader, onNewCarBlock, 0, cid.NewSet(), sc, cidlink.DefaultLinkSystem()}
traverser.lsys.StorageReadOpener = traverser.loader
return traverser.traverse()
Expand Down Expand Up @@ -264,13 +266,21 @@ func (sct *selectiveCarTraverser) traverseBlocks() error {
if err != nil {
return err
}
err = traversal.Progress{
prog := traversal.Progress{
Cfg: &traversal.Config{
Ctx: sct.sc.ctx,
LinkSystem: sct.lsys,
LinkTargetNodePrototypeChooser: nsc,
LinkVisitOnlyOnce: sct.sc.opts.TraverseLinksOnlyOnce,
},
}.WalkAdv(nd, parsed, func(traversal.Progress, ipld.Node, traversal.VisitReason) error { return nil })
}
if sct.sc.opts.MaxTraversalLinks < math.MaxInt64 {
prog.Budget = &traversal.Budget{
NodeBudget: math.MaxInt64,
LinkBudget: int64(sct.sc.opts.MaxTraversalLinks),
}
}
err = prog.WalkAdv(nd, parsed, func(traversal.Progress, ipld.Node, traversal.VisitReason) error { return nil })
if err != nil {
return err
}
Expand Down
Loading