Skip to content

Commit

Permalink
refactor: detailed boolean option and benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
hacdias committed Jan 2, 2024
1 parent a73b187 commit dc2f8b0
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 23 deletions.
35 changes: 20 additions & 15 deletions pinning/pinner/dspinner/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,16 +665,16 @@ func (p *pinner) loadPin(ctx context.Context, pid string) (*pin, error) {
}

// DirectKeys returns a slice containing the directly pinned keys
func (p *pinner) DirectKeys(ctx context.Context) <-chan ipfspinner.StreamedPin {
return p.streamIndex(ctx, p.cidDIndex)
func (p *pinner) DirectKeys(ctx context.Context, detailed bool) <-chan ipfspinner.StreamedPin {
return p.streamIndex(ctx, p.cidDIndex, detailed)
}

// RecursiveKeys returns a slice containing the recursively pinned keys
func (p *pinner) RecursiveKeys(ctx context.Context) <-chan ipfspinner.StreamedPin {
return p.streamIndex(ctx, p.cidRIndex)
func (p *pinner) RecursiveKeys(ctx context.Context, detailed bool) <-chan ipfspinner.StreamedPin {
return p.streamIndex(ctx, p.cidRIndex, detailed)
}

func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer) <-chan ipfspinner.StreamedPin {
func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer, detailed bool) <-chan ipfspinner.StreamedPin {
out := make(chan ipfspinner.StreamedPin)

go func() {
Expand All @@ -692,21 +692,26 @@ func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer) <-chan
return false
}

pp, err := p.loadPin(ctx, value)
if err != nil {
out <- ipfspinner.StreamedPin{Err: err}
return false
var pin ipfspinner.Pinned
if detailed {
pp, err := p.loadPin(ctx, value)
if err != nil {
out <- ipfspinner.StreamedPin{Err: err}
return false
}

Check warning on line 701 in pinning/pinner/dspinner/pin.go

View check run for this annotation

Codecov / codecov/patch

pinning/pinner/dspinner/pin.go#L699-L701

Added lines #L699 - L701 were not covered by tests

pin.Key = pp.Cid
pin.Mode = pp.Mode
pin.Name = pp.Name
} else {
pin.Key = c
}

if !cidSet.Has(c) {
select {
case <-ctx.Done():
return false
case out <- ipfspinner.StreamedPin{Pin: ipfspinner.Pinned{
Key: pp.Cid,
Mode: pp.Mode,
Name: pp.Name,
}}:
case out <- ipfspinner.StreamedPin{Pin: pin}:
}
cidSet.Add(c)
}
Expand All @@ -722,7 +727,7 @@ func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer) <-chan

// InternalPins returns all cids kept pinned for the internal state of the
// pinner
func (p *pinner) InternalPins(ctx context.Context) <-chan ipfspinner.StreamedPin {
func (p *pinner) InternalPins(ctx context.Context, detailed bool) <-chan ipfspinner.StreamedPin {
c := make(chan ipfspinner.StreamedPin)
close(c)
return c
Expand Down
46 changes: 43 additions & 3 deletions pinning/pinner/dspinner/pin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

bs "github.com/ipfs/boxo/blockservice"
mdag "github.com/ipfs/boxo/ipld/merkledag"
"github.com/stretchr/testify/require"

cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
Expand Down Expand Up @@ -210,7 +211,7 @@ func TestPinnerBasic(t *testing.T) {
return pins
}

pins := allPins(p.RecursiveKeys(ctx))
pins := allPins(p.RecursiveKeys(ctx, true))
if len(pins) != 2 {
t.Error("expected 2 recursive pins")
}
Expand Down Expand Up @@ -255,15 +256,15 @@ func TestPinnerBasic(t *testing.T) {
}
}

pins = allPins(p.DirectKeys(ctx))
pins = allPins(p.DirectKeys(ctx, false))
if len(pins) != 1 {
t.Error("expected 1 direct pin")
}
if pins[0].Key != ak {
t.Error("wrong direct pin")
}

pins = allPins(p.InternalPins(ctx))
pins = allPins(p.InternalPins(ctx, false))
if len(pins) != 0 {
t.Error("should not have internal keys")
}
Expand Down Expand Up @@ -1351,3 +1352,42 @@ func verifyIndexValue(ctx context.Context, pinner *pinner, cidKey, expectedPid s
}
return nil
}

func BenchmarkDetails(b *testing.B) {
for count := 128; count <= 16386; count <<= 1 {
b.Run(fmt.Sprint("Keys-NoDetails-", count), func(b *testing.B) {
benchmarkDetails(b, count, false)
})

b.Run(fmt.Sprint("Keys-Details-", count), func(b *testing.B) {
benchmarkDetails(b, count, true)
})
}
}

func benchmarkDetails(b *testing.B, count int, details bool) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

dstore, dserv := makeStore()
pinner, err := New(ctx, dstore, dserv)
require.NoError(b, err)
nodes := makeNodes(count, dserv)

// Pin all the nodes one at a time.
for j := range nodes {
err := pinner.Pin(ctx, nodes[j], true, "")
require.NoError(b, err)

err = pinner.Flush(ctx)
require.NoError(b, err)
}

// Reset the timer and execute actual benchmark.
b.ResetTimer()
for i := 0; i < b.N; i++ {
for val := range pinner.RecursiveKeys(ctx, details) {
require.NoError(b, val.Err)
}
}
}
6 changes: 3 additions & 3 deletions pinning/pinner/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,14 @@ type Pinner interface {
Flush(ctx context.Context) error

// DirectKeys returns all directly pinned cids
DirectKeys(ctx context.Context) <-chan StreamedPin
DirectKeys(ctx context.Context, detailed bool) <-chan StreamedPin

// RecursiveKeys returns all recursively pinned cids
RecursiveKeys(ctx context.Context) <-chan StreamedPin
RecursiveKeys(ctx context.Context, detailed bool) <-chan StreamedPin

// InternalPins returns all cids kept pinned for the internal state of the
// pinner
InternalPins(ctx context.Context) <-chan StreamedPin
InternalPins(ctx context.Context, detailed bool) <-chan StreamedPin
}

// Pinned represents CID which has been pinned with a pinning strategy.
Expand Down
4 changes: 2 additions & 2 deletions provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func pinSet(ctx context.Context, pinning pin.Pinner, fetchConfig fetcher.Factory
defer cancel()
defer close(set.New)

for sc := range pinning.DirectKeys(ctx) {
for sc := range pinning.DirectKeys(ctx, false) {

Check warning on line 83 in provider/provider.go

View check run for this annotation

Codecov / codecov/patch

provider/provider.go#L83

Added line #L83 was not covered by tests
if sc.Err != nil {
logR.Errorf("reprovide direct pins: %s", sc.Err)
return
Expand All @@ -89,7 +89,7 @@ func pinSet(ctx context.Context, pinning pin.Pinner, fetchConfig fetcher.Factory
}

session := fetchConfig.NewSession(ctx)
for sc := range pinning.RecursiveKeys(ctx) {
for sc := range pinning.RecursiveKeys(ctx, false) {

Check warning on line 92 in provider/provider.go

View check run for this annotation

Codecov / codecov/patch

provider/provider.go#L92

Added line #L92 was not covered by tests
if sc.Err != nil {
logR.Errorf("reprovide recursive pins: %s", sc.Err)
return
Expand Down

0 comments on commit dc2f8b0

Please sign in to comment.