Skip to content

Commit

Permalink
feat(pinning): add support for labels
Browse files Browse the repository at this point in the history
  • Loading branch information
hacdias committed Dec 15, 2023
1 parent 483bc39 commit 3d1f598
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 53 deletions.
51 changes: 32 additions & 19 deletions pinning/pinner/dspinner/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,20 +174,20 @@ func (p *pinner) SetAutosync(auto bool) bool {
}

// Pin the given node, optionally recursive
func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool) error {
func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool, label string) error {
err := p.dserv.Add(ctx, node)
if err != nil {
return err
}

if recurse {
return p.doPinRecursive(ctx, node.Cid(), true)
return p.doPinRecursive(ctx, node.Cid(), true, label)
} else {
return p.doPinDirect(ctx, node.Cid())
return p.doPinDirect(ctx, node.Cid(), label)
}
}

func (p *pinner) doPinRecursive(ctx context.Context, c cid.Cid, fetch bool) error {
func (p *pinner) doPinRecursive(ctx context.Context, c cid.Cid, fetch bool, label string) error {
cidKey := c.KeyString()

p.lock.Lock()
Expand Down Expand Up @@ -243,14 +243,14 @@ func (p *pinner) doPinRecursive(ctx context.Context, c cid.Cid, fetch bool) erro
}
}

_, err = p.addPin(ctx, c, ipfspinner.Recursive, "")
_, err = p.addPin(ctx, c, ipfspinner.Recursive, label)
if err != nil {
return err
}
return p.flushPins(ctx, false)
}

func (p *pinner) doPinDirect(ctx context.Context, c cid.Cid) error {
func (p *pinner) doPinDirect(ctx context.Context, c cid.Cid, label string) error {
cidKey := c.KeyString()

p.lock.Lock()
Expand All @@ -264,7 +264,7 @@ func (p *pinner) doPinDirect(ctx context.Context, c cid.Cid) error {
return fmt.Errorf("%s already pinned recursively", c.String())
}

_, err = p.addPin(ctx, c, ipfspinner.Direct, "")
_, err = p.addPin(ctx, c, ipfspinner.Direct, label)
if err != nil {
return err
}
Expand Down Expand Up @@ -665,17 +665,17 @@ func (p *pinner) loadPin(ctx context.Context, pid string) (*pin, error) {
}

// DirectKeys returns a slice containing the directly pinned keys
func (p *pinner) DirectKeys(ctx context.Context) <-chan ipfspinner.StreamedCid {
func (p *pinner) DirectKeys(ctx context.Context) <-chan ipfspinner.StreamedPin {
return p.streamIndex(ctx, p.cidDIndex)
}

// RecursiveKeys returns a slice containing the recursively pinned keys
func (p *pinner) RecursiveKeys(ctx context.Context) <-chan ipfspinner.StreamedCid {
func (p *pinner) RecursiveKeys(ctx context.Context) <-chan ipfspinner.StreamedPin {
return p.streamIndex(ctx, p.cidRIndex)
}

func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer) <-chan ipfspinner.StreamedCid {
out := make(chan ipfspinner.StreamedCid)
func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer) <-chan ipfspinner.StreamedPin {
out := make(chan ipfspinner.StreamedPin)

go func() {
defer close(out)
Expand All @@ -688,21 +688,32 @@ func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer) <-chan
err := index.ForEach(ctx, "", func(key, value string) bool {
c, err := cid.Cast([]byte(key))
if err != nil {
out <- ipfspinner.StreamedCid{Err: err}
out <- ipfspinner.StreamedPin{Err: err}

Check warning on line 691 in pinning/pinner/dspinner/pin.go

View check run for this annotation

Codecov / codecov/patch

pinning/pinner/dspinner/pin.go#L691

Added line #L691 was not covered by tests
return false
}

pp, err := p.loadPin(ctx, value)
if err != nil {
out <- ipfspinner.StreamedPin{Err: err}
return false
}

Check warning on line 699 in pinning/pinner/dspinner/pin.go

View check run for this annotation

Codecov / codecov/patch

pinning/pinner/dspinner/pin.go#L697-L699

Added lines #L697 - L699 were not covered by tests

if !cidSet.Has(c) {
select {
case <-ctx.Done():
return false
case out <- ipfspinner.StreamedCid{C: c}:
case out <- ipfspinner.StreamedPin{Pin: ipfspinner.Pinned{
Key: pp.Cid,
Mode: pp.Mode,
Label: pp.Name,
}}:
}
cidSet.Add(c)
}
return true
})
if err != nil {
out <- ipfspinner.StreamedCid{Err: err}
out <- ipfspinner.StreamedPin{Err: err}

Check warning on line 716 in pinning/pinner/dspinner/pin.go

View check run for this annotation

Codecov / codecov/patch

pinning/pinner/dspinner/pin.go#L716

Added line #L716 was not covered by tests
}
}()

Expand All @@ -711,8 +722,8 @@ func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer) <-chan

// InternalPins returns all cids kept pinned for the internal state of the
// pinner
func (p *pinner) InternalPins(ctx context.Context) <-chan ipfspinner.StreamedCid {
c := make(chan ipfspinner.StreamedCid)
func (p *pinner) InternalPins(ctx context.Context) <-chan ipfspinner.StreamedPin {
c := make(chan ipfspinner.StreamedPin)
close(c)
return c
}
Expand Down Expand Up @@ -756,6 +767,8 @@ func (p *pinner) Update(ctx context.Context, from, to cid.Cid, unpin bool) error
return err
}

// TODO: get old pin label

_, err = p.addPin(ctx, to, ipfspinner.Recursive, "")
if err != nil {
return err
Expand Down Expand Up @@ -809,13 +822,13 @@ func (p *pinner) Flush(ctx context.Context) error {

// PinWithMode allows the user to have fine grained control over pin
// counts
func (p *pinner) PinWithMode(ctx context.Context, c cid.Cid, mode ipfspinner.Mode) error {
func (p *pinner) PinWithMode(ctx context.Context, c cid.Cid, mode ipfspinner.Mode, label string) error {
// TODO: remove his to support multiple pins per CID
switch mode {
case ipfspinner.Recursive:
return p.doPinRecursive(ctx, c, false)
return p.doPinRecursive(ctx, c, false, label)
case ipfspinner.Direct:
return p.doPinDirect(ctx, c)
return p.doPinDirect(ctx, c, label)

Check warning on line 831 in pinning/pinner/dspinner/pin.go

View check run for this annotation

Codecov / codecov/patch

pinning/pinner/dspinner/pin.go#L831

Added line #L831 was not covered by tests
default:
return errors.New("unrecognized pin mode")
}
Expand Down
40 changes: 20 additions & 20 deletions pinning/pinner/dspinner/pin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func TestPinnerBasic(t *testing.T) {
}

// Pin A{}
err = p.Pin(ctx, a, false)
err = p.Pin(ctx, a, false, "")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -154,7 +154,7 @@ func TestPinnerBasic(t *testing.T) {
bk := b.Cid()

// recursively pin B{A,C}
err = p.Pin(ctx, b, true)
err = p.Pin(ctx, b, true, "")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -191,20 +191,20 @@ func TestPinnerBasic(t *testing.T) {
}

// Add D{A,C,E}
err = p.Pin(ctx, d, true)
err = p.Pin(ctx, d, true, "")
if err != nil {
t.Fatal(err)
}

dk := d.Cid()
assertPinned(t, p, dk, "pinned node not found.")

allCids := func(ch <-chan ipfspin.StreamedCid) (cids []cid.Cid) {
allCids := func(ch <-chan ipfspin.StreamedPin) (cids []cid.Cid) {
for val := range ch {
if val.Err != nil {
t.Fatal(val.Err)
}
cids = append(cids, val.C)
cids = append(cids, val.Pin.Key)
}
return cids
}
Expand Down Expand Up @@ -323,7 +323,7 @@ func TestPinnerBasic(t *testing.T) {
fakeLog := &fakeLogger{}
fakeLog.StandardLogger = log
log = fakeLog
err = p.Pin(ctx, a, true)
err = p.Pin(ctx, a, true, "")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -457,19 +457,19 @@ func TestDuplicateSemantics(t *testing.T) {
}

// pin is recursively
err = p.Pin(ctx, a, true)
err = p.Pin(ctx, a, true, "")
if err != nil {
t.Fatal(err)
}

// pinning directly should fail
err = p.Pin(ctx, a, false)
err = p.Pin(ctx, a, false, "")
if err == nil {
t.Fatal("expected direct pin to fail")
}

// pinning recursively again should succeed
err = p.Pin(ctx, a, true)
err = p.Pin(ctx, a, true, "")
if err != nil {
t.Fatal(err)
}
Expand All @@ -489,7 +489,7 @@ func TestFlush(t *testing.T) {
}
_, k := randNode()

p.PinWithMode(ctx, k, ipfspin.Recursive)
p.PinWithMode(ctx, k, ipfspin.Recursive, "")
if err = p.Flush(ctx); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -520,7 +520,7 @@ func TestPinRecursiveFail(t *testing.T) {
mctx, cancel := context.WithTimeout(ctx, time.Millisecond)
defer cancel()

err = p.Pin(mctx, a, true)
err = p.Pin(mctx, a, true, "")
if err == nil {
t.Fatal("should have failed to pin here")
}
Expand All @@ -538,7 +538,7 @@ func TestPinRecursiveFail(t *testing.T) {
// this one is time based... but shouldnt cause any issues
mctx, cancel = context.WithTimeout(ctx, time.Second)
defer cancel()
err = p.Pin(mctx, a, true)
err = p.Pin(mctx, a, true, "")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -568,7 +568,7 @@ func TestPinUpdate(t *testing.T) {
t.Fatal(err)
}

if err = p.Pin(ctx, n1, true); err != nil {
if err = p.Pin(ctx, n1, true, ""); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -646,7 +646,7 @@ func TestLoadDirty(t *testing.T) {

_, bk := randNode()

err = p.Pin(ctx, a, true)
err = p.Pin(ctx, a, true, "")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -787,7 +787,7 @@ func makeTree(ctx context.Context, aBranchLen int, dserv ipld.DAGService, p ipfs
}

// Pin last A recursively
if err = p.Pin(ctx, aNodes[aBranchLen-1], true); err != nil {
if err = p.Pin(ctx, aNodes[aBranchLen-1], true, ""); err != nil {
return
}

Expand Down Expand Up @@ -820,12 +820,12 @@ func makeTree(ctx context.Context, aBranchLen int, dserv ipld.DAGService, p ipfs
bk = b.Cid()

// Pin C recursively
if err = p.Pin(ctx, c, true); err != nil {
if err = p.Pin(ctx, c, true, ""); err != nil {
return
}

// Pin B recursively
if err = p.Pin(ctx, b, true); err != nil {
if err = p.Pin(ctx, b, true, ""); err != nil {
return
}

Expand Down Expand Up @@ -857,7 +857,7 @@ func pinNodes(nodes []ipld.Node, p ipfspin.Pinner, recursive bool) {
var err error

for i := range nodes {
err = p.Pin(ctx, nodes[i], recursive)
err = p.Pin(ctx, nodes[i], recursive, "")
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -975,7 +975,7 @@ func benchmarkNthPin(b *testing.B, count int, pinner ipfspin.Pinner, dserv ipld.
which := count - 1
for i := 0; i < b.N; i++ {
// Pin the Nth node and Flush
err := pinner.Pin(ctx, nodes[which], true)
err := pinner.Pin(ctx, nodes[which], true, "")
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -1021,7 +1021,7 @@ func benchmarkNPins(b *testing.B, count int, pinner ipfspin.Pinner, dserv ipld.D
for i := 0; i < b.N; i++ {
// Pin all the nodes one at a time.
for j := range nodes {
err := pinner.Pin(ctx, nodes[j], true)
err := pinner.Pin(ctx, nodes[j], true, "")
if err != nil {
panic(err)
}
Expand Down
23 changes: 12 additions & 11 deletions pinning/pinner/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ type Pinner interface {
// Pin the given node, optionally recursively.
// Pin will make sure that the given node and its children if recursive is set
// are stored locally.
Pin(ctx context.Context, node ipld.Node, recursive bool) error
Pin(ctx context.Context, node ipld.Node, recursive bool, label string) error

// Unpin the given cid. If recursive is true, removes either a recursive or
// a direct pin. If recursive is false, only removes a direct pin.
Expand All @@ -114,30 +114,31 @@ type Pinner interface {
// PinWithMode is for manually editing the pin structure. Use with
// care! If used improperly, garbage collection may not be
// successful.
PinWithMode(context.Context, cid.Cid, Mode) error
PinWithMode(context.Context, cid.Cid, Mode, string) error

// Flush writes the pin state to the backing datastore
Flush(ctx context.Context) error

// DirectKeys returns all directly pinned cids
DirectKeys(ctx context.Context) <-chan StreamedCid
DirectKeys(ctx context.Context) <-chan StreamedPin

// RecursiveKeys returns all recursively pinned cids
RecursiveKeys(ctx context.Context) <-chan StreamedCid
RecursiveKeys(ctx context.Context) <-chan StreamedPin

// InternalPins returns all cids kept pinned for the internal state of the
// pinner
InternalPins(ctx context.Context) <-chan StreamedCid
InternalPins(ctx context.Context) <-chan StreamedPin
}

// Pinned represents CID which has been pinned with a pinning strategy.
// The Via field allows to identify the pinning parent of this CID, in the
// case that the item is not pinned directly (but rather pinned recursively
// by some ascendant).
type Pinned struct {
Key cid.Cid
Mode Mode
Via cid.Cid
Key cid.Cid
Mode Mode
Label string
Via cid.Cid
}

// Pinned returns whether or not the given cid is pinned
Expand All @@ -158,8 +159,8 @@ func (p Pinned) String() string {
}
}

// StreamedCid encapsulate a Cid and an error for a function to return a channel of Cids.
type StreamedCid struct {
C cid.Cid
// StreamedPin encapsulate a [Pin] and an error for a function to return a channel of [Pin]s.
type StreamedPin struct {
Pin Pinned
Err error
}
6 changes: 3 additions & 3 deletions provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func pinSet(ctx context.Context, pinning pin.Pinner, fetchConfig fetcher.Factory
logR.Errorf("reprovide direct pins: %s", sc.Err)
return
}
set.Visitor(ctx)(sc.C)
set.Visitor(ctx)(sc.Pin.Key)

Check warning on line 88 in provider/provider.go

View check run for this annotation

Codecov / codecov/patch

provider/provider.go#L88

Added line #L88 was not covered by tests
}

session := fetchConfig.NewSession(ctx)
Expand All @@ -94,9 +94,9 @@ func pinSet(ctx context.Context, pinning pin.Pinner, fetchConfig fetcher.Factory
logR.Errorf("reprovide recursive pins: %s", sc.Err)
return
}
set.Visitor(ctx)(sc.C)
set.Visitor(ctx)(sc.Pin.Key)

Check warning on line 97 in provider/provider.go

View check run for this annotation

Codecov / codecov/patch

provider/provider.go#L97

Added line #L97 was not covered by tests
if !onlyRoots {
err := fetcherhelpers.BlockAll(ctx, session, cidlink.Link{Cid: sc.C}, func(res fetcher.FetchResult) error {
err := fetcherhelpers.BlockAll(ctx, session, cidlink.Link{Cid: sc.Pin.Key}, func(res fetcher.FetchResult) error {

Check warning on line 99 in provider/provider.go

View check run for this annotation

Codecov / codecov/patch

provider/provider.go#L99

Added line #L99 was not covered by tests
clink, ok := res.LastBlockLink.(cidlink.Link)
if ok {
set.Visitor(ctx)(clink.Cid)
Expand Down

0 comments on commit 3d1f598

Please sign in to comment.