From 3d1f5985dba0e175e59fd4824837811b0ab84d5d Mon Sep 17 00:00:00 2001 From: Henrique Dias Date: Fri, 15 Dec 2023 10:20:38 +0100 Subject: [PATCH] feat(pinning): add support for labels --- pinning/pinner/dspinner/pin.go | 51 ++++++++++++++++++----------- pinning/pinner/dspinner/pin_test.go | 40 +++++++++++----------- pinning/pinner/pin.go | 23 ++++++------- provider/provider.go | 6 ++-- 4 files changed, 67 insertions(+), 53 deletions(-) diff --git a/pinning/pinner/dspinner/pin.go b/pinning/pinner/dspinner/pin.go index 5b0a1ef7f..18cf9ba43 100644 --- a/pinning/pinner/dspinner/pin.go +++ b/pinning/pinner/dspinner/pin.go @@ -174,20 +174,20 @@ func (p *pinner) SetAutosync(auto bool) bool { } // Pin the given node, optionally recursive -func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool) error { +func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool, label string) error { err := p.dserv.Add(ctx, node) if err != nil { return err } if recurse { - return p.doPinRecursive(ctx, node.Cid(), true) + return p.doPinRecursive(ctx, node.Cid(), true, label) } else { - return p.doPinDirect(ctx, node.Cid()) + return p.doPinDirect(ctx, node.Cid(), label) } } -func (p *pinner) doPinRecursive(ctx context.Context, c cid.Cid, fetch bool) error { +func (p *pinner) doPinRecursive(ctx context.Context, c cid.Cid, fetch bool, label string) error { cidKey := c.KeyString() p.lock.Lock() @@ -243,14 +243,14 @@ func (p *pinner) doPinRecursive(ctx context.Context, c cid.Cid, fetch bool) erro } } - _, err = p.addPin(ctx, c, ipfspinner.Recursive, "") + _, err = p.addPin(ctx, c, ipfspinner.Recursive, label) if err != nil { return err } return p.flushPins(ctx, false) } -func (p *pinner) doPinDirect(ctx context.Context, c cid.Cid) error { +func (p *pinner) doPinDirect(ctx context.Context, c cid.Cid, label string) error { cidKey := c.KeyString() p.lock.Lock() @@ -264,7 +264,7 @@ func (p *pinner) doPinDirect(ctx context.Context, c cid.Cid) error { return fmt.Errorf("%s already pinned recursively", c.String()) } - _, err = p.addPin(ctx, c, ipfspinner.Direct, "") + _, err = p.addPin(ctx, c, ipfspinner.Direct, label) if err != nil { return err } @@ -665,17 +665,17 @@ func (p *pinner) loadPin(ctx context.Context, pid string) (*pin, error) { } // DirectKeys returns a slice containing the directly pinned keys -func (p *pinner) DirectKeys(ctx context.Context) <-chan ipfspinner.StreamedCid { +func (p *pinner) DirectKeys(ctx context.Context) <-chan ipfspinner.StreamedPin { return p.streamIndex(ctx, p.cidDIndex) } // RecursiveKeys returns a slice containing the recursively pinned keys -func (p *pinner) RecursiveKeys(ctx context.Context) <-chan ipfspinner.StreamedCid { +func (p *pinner) RecursiveKeys(ctx context.Context) <-chan ipfspinner.StreamedPin { return p.streamIndex(ctx, p.cidRIndex) } -func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer) <-chan ipfspinner.StreamedCid { - out := make(chan ipfspinner.StreamedCid) +func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer) <-chan ipfspinner.StreamedPin { + out := make(chan ipfspinner.StreamedPin) go func() { defer close(out) @@ -688,21 +688,32 @@ func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer) <-chan err := index.ForEach(ctx, "", func(key, value string) bool { c, err := cid.Cast([]byte(key)) if err != nil { - out <- ipfspinner.StreamedCid{Err: err} + out <- ipfspinner.StreamedPin{Err: err} return false } + + pp, err := p.loadPin(ctx, value) + if err != nil { + out <- ipfspinner.StreamedPin{Err: err} + return false + } + if !cidSet.Has(c) { select { case <-ctx.Done(): return false - case out <- ipfspinner.StreamedCid{C: c}: + case out <- ipfspinner.StreamedPin{Pin: ipfspinner.Pinned{ + Key: pp.Cid, + Mode: pp.Mode, + Label: pp.Name, + }}: } cidSet.Add(c) } return true }) if err != nil { - out <- ipfspinner.StreamedCid{Err: err} + out <- ipfspinner.StreamedPin{Err: err} } }() @@ -711,8 +722,8 @@ func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer) <-chan // InternalPins returns all cids kept pinned for the internal state of the // pinner -func (p *pinner) InternalPins(ctx context.Context) <-chan ipfspinner.StreamedCid { - c := make(chan ipfspinner.StreamedCid) +func (p *pinner) InternalPins(ctx context.Context) <-chan ipfspinner.StreamedPin { + c := make(chan ipfspinner.StreamedPin) close(c) return c } @@ -756,6 +767,8 @@ func (p *pinner) Update(ctx context.Context, from, to cid.Cid, unpin bool) error return err } + // TODO: get old pin label + _, err = p.addPin(ctx, to, ipfspinner.Recursive, "") if err != nil { return err @@ -809,13 +822,13 @@ func (p *pinner) Flush(ctx context.Context) error { // PinWithMode allows the user to have fine grained control over pin // counts -func (p *pinner) PinWithMode(ctx context.Context, c cid.Cid, mode ipfspinner.Mode) error { +func (p *pinner) PinWithMode(ctx context.Context, c cid.Cid, mode ipfspinner.Mode, label string) error { // TODO: remove his to support multiple pins per CID switch mode { case ipfspinner.Recursive: - return p.doPinRecursive(ctx, c, false) + return p.doPinRecursive(ctx, c, false, label) case ipfspinner.Direct: - return p.doPinDirect(ctx, c) + return p.doPinDirect(ctx, c, label) default: return errors.New("unrecognized pin mode") } diff --git a/pinning/pinner/dspinner/pin_test.go b/pinning/pinner/dspinner/pin_test.go index ad8284480..dec800b2c 100644 --- a/pinning/pinner/dspinner/pin_test.go +++ b/pinning/pinner/dspinner/pin_test.go @@ -120,7 +120,7 @@ func TestPinnerBasic(t *testing.T) { } // Pin A{} - err = p.Pin(ctx, a, false) + err = p.Pin(ctx, a, false, "") if err != nil { t.Fatal(err) } @@ -154,7 +154,7 @@ func TestPinnerBasic(t *testing.T) { bk := b.Cid() // recursively pin B{A,C} - err = p.Pin(ctx, b, true) + err = p.Pin(ctx, b, true, "") if err != nil { t.Fatal(err) } @@ -191,7 +191,7 @@ func TestPinnerBasic(t *testing.T) { } // Add D{A,C,E} - err = p.Pin(ctx, d, true) + err = p.Pin(ctx, d, true, "") if err != nil { t.Fatal(err) } @@ -199,12 +199,12 @@ func TestPinnerBasic(t *testing.T) { dk := d.Cid() assertPinned(t, p, dk, "pinned node not found.") - allCids := func(ch <-chan ipfspin.StreamedCid) (cids []cid.Cid) { + allCids := func(ch <-chan ipfspin.StreamedPin) (cids []cid.Cid) { for val := range ch { if val.Err != nil { t.Fatal(val.Err) } - cids = append(cids, val.C) + cids = append(cids, val.Pin.Key) } return cids } @@ -323,7 +323,7 @@ func TestPinnerBasic(t *testing.T) { fakeLog := &fakeLogger{} fakeLog.StandardLogger = log log = fakeLog - err = p.Pin(ctx, a, true) + err = p.Pin(ctx, a, true, "") if err != nil { t.Fatal(err) } @@ -457,19 +457,19 @@ func TestDuplicateSemantics(t *testing.T) { } // pin is recursively - err = p.Pin(ctx, a, true) + err = p.Pin(ctx, a, true, "") if err != nil { t.Fatal(err) } // pinning directly should fail - err = p.Pin(ctx, a, false) + err = p.Pin(ctx, a, false, "") if err == nil { t.Fatal("expected direct pin to fail") } // pinning recursively again should succeed - err = p.Pin(ctx, a, true) + err = p.Pin(ctx, a, true, "") if err != nil { t.Fatal(err) } @@ -489,7 +489,7 @@ func TestFlush(t *testing.T) { } _, k := randNode() - p.PinWithMode(ctx, k, ipfspin.Recursive) + p.PinWithMode(ctx, k, ipfspin.Recursive, "") if err = p.Flush(ctx); err != nil { t.Fatal(err) } @@ -520,7 +520,7 @@ func TestPinRecursiveFail(t *testing.T) { mctx, cancel := context.WithTimeout(ctx, time.Millisecond) defer cancel() - err = p.Pin(mctx, a, true) + err = p.Pin(mctx, a, true, "") if err == nil { t.Fatal("should have failed to pin here") } @@ -538,7 +538,7 @@ func TestPinRecursiveFail(t *testing.T) { // this one is time based... but shouldnt cause any issues mctx, cancel = context.WithTimeout(ctx, time.Second) defer cancel() - err = p.Pin(mctx, a, true) + err = p.Pin(mctx, a, true, "") if err != nil { t.Fatal(err) } @@ -568,7 +568,7 @@ func TestPinUpdate(t *testing.T) { t.Fatal(err) } - if err = p.Pin(ctx, n1, true); err != nil { + if err = p.Pin(ctx, n1, true, ""); err != nil { t.Fatal(err) } @@ -646,7 +646,7 @@ func TestLoadDirty(t *testing.T) { _, bk := randNode() - err = p.Pin(ctx, a, true) + err = p.Pin(ctx, a, true, "") if err != nil { t.Fatal(err) } @@ -787,7 +787,7 @@ func makeTree(ctx context.Context, aBranchLen int, dserv ipld.DAGService, p ipfs } // Pin last A recursively - if err = p.Pin(ctx, aNodes[aBranchLen-1], true); err != nil { + if err = p.Pin(ctx, aNodes[aBranchLen-1], true, ""); err != nil { return } @@ -820,12 +820,12 @@ func makeTree(ctx context.Context, aBranchLen int, dserv ipld.DAGService, p ipfs bk = b.Cid() // Pin C recursively - if err = p.Pin(ctx, c, true); err != nil { + if err = p.Pin(ctx, c, true, ""); err != nil { return } // Pin B recursively - if err = p.Pin(ctx, b, true); err != nil { + if err = p.Pin(ctx, b, true, ""); err != nil { return } @@ -857,7 +857,7 @@ func pinNodes(nodes []ipld.Node, p ipfspin.Pinner, recursive bool) { var err error for i := range nodes { - err = p.Pin(ctx, nodes[i], recursive) + err = p.Pin(ctx, nodes[i], recursive, "") if err != nil { panic(err) } @@ -975,7 +975,7 @@ func benchmarkNthPin(b *testing.B, count int, pinner ipfspin.Pinner, dserv ipld. which := count - 1 for i := 0; i < b.N; i++ { // Pin the Nth node and Flush - err := pinner.Pin(ctx, nodes[which], true) + err := pinner.Pin(ctx, nodes[which], true, "") if err != nil { panic(err) } @@ -1021,7 +1021,7 @@ func benchmarkNPins(b *testing.B, count int, pinner ipfspin.Pinner, dserv ipld.D for i := 0; i < b.N; i++ { // Pin all the nodes one at a time. for j := range nodes { - err := pinner.Pin(ctx, nodes[j], true) + err := pinner.Pin(ctx, nodes[j], true, "") if err != nil { panic(err) } diff --git a/pinning/pinner/pin.go b/pinning/pinner/pin.go index c5169bfee..64571342d 100644 --- a/pinning/pinner/pin.go +++ b/pinning/pinner/pin.go @@ -95,7 +95,7 @@ type Pinner interface { // Pin the given node, optionally recursively. // Pin will make sure that the given node and its children if recursive is set // are stored locally. - Pin(ctx context.Context, node ipld.Node, recursive bool) error + Pin(ctx context.Context, node ipld.Node, recursive bool, label string) error // Unpin the given cid. If recursive is true, removes either a recursive or // a direct pin. If recursive is false, only removes a direct pin. @@ -114,20 +114,20 @@ type Pinner interface { // PinWithMode is for manually editing the pin structure. Use with // care! If used improperly, garbage collection may not be // successful. - PinWithMode(context.Context, cid.Cid, Mode) error + PinWithMode(context.Context, cid.Cid, Mode, string) error // Flush writes the pin state to the backing datastore Flush(ctx context.Context) error // DirectKeys returns all directly pinned cids - DirectKeys(ctx context.Context) <-chan StreamedCid + DirectKeys(ctx context.Context) <-chan StreamedPin // RecursiveKeys returns all recursively pinned cids - RecursiveKeys(ctx context.Context) <-chan StreamedCid + RecursiveKeys(ctx context.Context) <-chan StreamedPin // InternalPins returns all cids kept pinned for the internal state of the // pinner - InternalPins(ctx context.Context) <-chan StreamedCid + InternalPins(ctx context.Context) <-chan StreamedPin } // Pinned represents CID which has been pinned with a pinning strategy. @@ -135,9 +135,10 @@ type Pinner interface { // case that the item is not pinned directly (but rather pinned recursively // by some ascendant). type Pinned struct { - Key cid.Cid - Mode Mode - Via cid.Cid + Key cid.Cid + Mode Mode + Label string + Via cid.Cid } // Pinned returns whether or not the given cid is pinned @@ -158,8 +159,8 @@ func (p Pinned) String() string { } } -// StreamedCid encapsulate a Cid and an error for a function to return a channel of Cids. -type StreamedCid struct { - C cid.Cid +// StreamedPin encapsulate a [Pin] and an error for a function to return a channel of [Pin]s. +type StreamedPin struct { + Pin Pinned Err error } diff --git a/provider/provider.go b/provider/provider.go index 6fb021695..6035fd92d 100644 --- a/provider/provider.go +++ b/provider/provider.go @@ -85,7 +85,7 @@ func pinSet(ctx context.Context, pinning pin.Pinner, fetchConfig fetcher.Factory logR.Errorf("reprovide direct pins: %s", sc.Err) return } - set.Visitor(ctx)(sc.C) + set.Visitor(ctx)(sc.Pin.Key) } session := fetchConfig.NewSession(ctx) @@ -94,9 +94,9 @@ func pinSet(ctx context.Context, pinning pin.Pinner, fetchConfig fetcher.Factory logR.Errorf("reprovide recursive pins: %s", sc.Err) return } - set.Visitor(ctx)(sc.C) + set.Visitor(ctx)(sc.Pin.Key) if !onlyRoots { - err := fetcherhelpers.BlockAll(ctx, session, cidlink.Link{Cid: sc.C}, func(res fetcher.FetchResult) error { + err := fetcherhelpers.BlockAll(ctx, session, cidlink.Link{Cid: sc.Pin.Key}, func(res fetcher.FetchResult) error { clink, ok := res.LastBlockLink.(cidlink.Link) if ok { set.Visitor(ctx)(clink.Cid)