Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 75 additions & 56 deletions pinning/pinner/dspinner/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,35 +646,9 @@ func (p *pinner) CheckIfPinnedWithType(ctx context.Context, mode ipfspinner.Mode

// Check for indirect pins
if toCheck.Len() > 0 {
var walkErr error
visited := cid.NewSet()
err := p.cidRIndex.ForEach(ctx, "", func(key, value string) bool {
var rk cid.Cid
rk, walkErr = cid.Cast([]byte(key))
if walkErr != nil {
return false
}
walkErr = merkledag.Walk(ctx, merkledag.GetLinksWithDAG(p.dserv), rk, func(c cid.Cid) bool {
if toCheck.Len() == 0 || !visited.Visit(c) {
return false
}
if toCheck.Has(c) {
pinned = append(pinned, ipfspinner.Pinned{Key: c, Mode: ipfspinner.Indirect, Via: rk})
toCheck.Remove(c)
}
return true
}, merkledag.Concurrent())
if walkErr != nil {
return false
}
return toCheck.Len() > 0
})
if err != nil {
if err := p.traverseIndirectPins(ctx, toCheck, &pinned); err != nil {
return nil, err
}
if walkErr != nil {
return nil, walkErr
}
}

// Anything left in toCheck is not pinned
Expand Down Expand Up @@ -741,48 +715,93 @@ func (p *pinner) checkPinsInIndex(ctx context.Context, mode ipfspinner.Mode, inc
return pinned, nil
}

// traverseIndirectPins is a helper that traverses all recursive pins to find indirect pins.
// It modifies the pinned slice and toCheck set in place.
func (p *pinner) traverseIndirectPins(ctx context.Context, toCheck *cid.Set, pinned *[]ipfspinner.Pinned) error {
var walkErr error
visited := cid.NewSet()
err := p.cidRIndex.ForEach(ctx, "", func(key, value string) bool {
// Check for context cancellation at the start of each recursive pin
select {
case <-ctx.Done():
walkErr = ctx.Err()
return false
default:
}

var rk cid.Cid
rk, walkErr = cid.Cast([]byte(key))
if walkErr != nil {
return false
}
walkErr = merkledag.Walk(ctx, merkledag.GetLinksWithDAG(p.dserv), rk, func(c cid.Cid) bool {
if toCheck.Len() == 0 || !visited.Visit(c) {
return false
}
if toCheck.Has(c) {
*pinned = append(*pinned, ipfspinner.Pinned{Key: c, Mode: ipfspinner.Indirect, Via: rk})
toCheck.Remove(c)
}
return true
}, merkledag.Concurrent())
if walkErr != nil {
return false
}
return toCheck.Len() > 0
})
if err != nil {
return err
}
if walkErr != nil {
return walkErr
}
return nil
}

// checkIndirectPins checks if the given cids are pinned indirectly
func (p *pinner) checkIndirectPins(ctx context.Context, cids ...cid.Cid) ([]ipfspinner.Pinned, error) {
pinned := make([]ipfspinner.Pinned, 0, len(cids))
toCheck := cid.NewSet()

// Check all CIDs for indirect pins, regardless of their direct pin status
// A CID can be both directly pinned AND indirectly pinned through a parent
// Filter out CIDs that are recursively pinned at the root level.
// A recursively pinned CID is not considered indirect because recursive pins
// are comprehensive (include all children), making "recursive" take precedence
// over "indirect".
//
// However, we do NOT filter out direct pins here. Direct pins only pin a
// single block, not its children. Therefore, a CID can legitimately be both:
// - Directly pinned (explicitly pinned as a single block)
// - Indirectly pinned (referenced by another pinned object's DAG)
// This is why the asymmetry between recursive and direct pins is intentional.
//
// NOTE: While this behavior may feel arbitrary, we preserve it for compatibility
// as this is how 'ipfs pin ls' has behaved for nearly a decade. The test
// t0081-repo-pinning.sh in Kubo explicitly expects a CID to be both direct
// and indirect, guarding this established behavior.
for _, c := range cids {
cidKey := c.KeyString()

// Check if recursively pinned
ids, err := p.cidRIndex.Search(ctx, cidKey)
if err != nil {
return nil, err
}
if len(ids) > 0 {
// This CID is recursively pinned at root level, not indirect
pinned = append(pinned, ipfspinner.Pinned{Key: c, Mode: ipfspinner.NotPinned})
continue
}

// Still check for indirect even if directly pinned
// A CID can be both direct and indirect
toCheck.Add(c)
}

// Now check for indirect pins by traversing recursive pins
if toCheck.Len() > 0 {
var walkErr error
visited := cid.NewSet()
err := p.cidRIndex.ForEach(ctx, "", func(key, value string) bool {
var rk cid.Cid
rk, walkErr = cid.Cast([]byte(key))
if walkErr != nil {
return false
}
walkErr = merkledag.Walk(ctx, merkledag.GetLinksWithDAG(p.dserv), rk, func(c cid.Cid) bool {
if toCheck.Len() == 0 || !visited.Visit(c) {
return false
}
if toCheck.Has(c) {
pinned = append(pinned, ipfspinner.Pinned{Key: c, Mode: ipfspinner.Indirect, Via: rk})
toCheck.Remove(c)
}
return true
}, merkledag.Concurrent())
if walkErr != nil {
return false
}
return toCheck.Len() > 0
})
if err != nil {
if err := p.traverseIndirectPins(ctx, toCheck, &pinned); err != nil {
return nil, err
}
if walkErr != nil {
return nil, walkErr
}
}

// Anything left in toCheck is not pinned
Expand Down
55 changes: 55 additions & 0 deletions pinning/pinner/dspinner/pin_withtype_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package dspinner

import (
"context"
"fmt"
"testing"
"time"

bs "github.com/ipfs/boxo/blockservice"
blockstore "github.com/ipfs/boxo/blockstore"
Expand Down Expand Up @@ -155,4 +157,57 @@ func TestCheckIfPinnedWithType(t *testing.T) {

require.Equal(t, pinned1, pinned2)
})

t.Run("Context cancellation during indirect check with many pins", func(t *testing.T) {
// Create many recursive pins to ensure we can hit the cancellation
nodes := make([]cid.Cid, 50)
for i := 0; i < 50; i++ {
node := mdag.NodeWithData([]byte(fmt.Sprintf("recursive node %d", i)))
err = dserv.Add(ctx, node)
require.NoError(t, err)
nodes[i] = node.Cid()

p.PinWithMode(ctx, nodes[i], ipfspin.Recursive, fmt.Sprintf("recursive-%d", i))
}
err = p.Flush(ctx)
require.NoError(t, err)

// Create a context that we can cancel
cancelCtx, cancel := context.WithCancel(ctx)

// Start checking in a goroutine
done := make(chan struct{})
var checkErr error
go func() {
defer close(done)
// Try to check for an indirect pin with many recursive pins to traverse
_, checkErr = p.CheckIfPinnedWithType(cancelCtx, ipfspin.Indirect, withoutNames, ck)
}()

// Cancel the context quickly
time.Sleep(1 * time.Millisecond)
cancel()

// Wait for the check to complete
<-done

// Should get a context cancellation error (or nil if it completed too fast)
// The important thing is that it doesn't hang
if checkErr != nil {
require.Equal(t, context.Canceled, checkErr)
}
})

t.Run("Context cancellation in checkIndirectPins", func(t *testing.T) {
// Create a context that we can cancel immediately
cancelCtx, cancel := context.WithCancel(ctx)
cancel() // Cancel immediately

// Try to check for indirect pins with cancelled context
_, err = p.CheckIfPinnedWithType(cancelCtx, ipfspin.Indirect, withoutNames, ck)

// Should get a context cancellation error
require.Error(t, err)
require.Equal(t, context.Canceled, err)
})
}
Loading