Skip to content

Commit

Permalink
fix: signal to index provider to skip announcements
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed May 18, 2023
1 parent 961657b commit 9f1a06b
Showing 1 changed file with 63 additions and 10 deletions.
73 changes: 63 additions & 10 deletions indexprovider/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@ package indexprovider

import (
"context"
"database/sql"
"errors"
"fmt"
"github.com/filecoin-project/dagstore/index"
"github.com/ipfs/go-datastore"
"io/fs"
"math"
"os"
"path/filepath"
Expand Down Expand Up @@ -262,47 +266,96 @@ func (w *Wrapper) IndexerAnnounceAllDeals(ctx context.Context) error {
return merr
}

// While ingesting cids for each piece, if there is an error the indexer
// checks if the error contains the string "content not found":
// - if so, the indexer skips the piece and continues ingestion
// - if not, the indexer pauses ingestion
var ErrStringSkipAdIngest = "content not found"

func skipError(err error) error {
return fmt.Errorf("%s: %w", ErrStringSkipAdIngest, err)
}

func (w *Wrapper) Start(ctx context.Context) {
// re-init dagstore shards for Boost deals if needed
if _, err := w.DagstoreReinitBoostDeals(ctx); err != nil {
log.Errorw("failed to migrate dagstore indices for Boost deals", "err", err)
}

w.prov.RegisterMultihashLister(func(ctx context.Context, pid peer.ID, contextID []byte) (provider.MultihashIterator, error) {
provideF := func(pieceCid cid.Cid) (provider.MultihashIterator, error) {
provideF := func(proposalCid cid.Cid, pieceCid cid.Cid) (provider.MultihashIterator, error) {
ii, err := w.dagStore.GetIterableIndexForPiece(pieceCid)
if err != nil {
return nil, fmt.Errorf("failed to get iterable index: %w", err)
e := fmt.Errorf("failed to get iterable index: %w", err)
if errors.Is(err, index.ErrNotFound) || errors.Is(err, fs.ErrNotExist) {
// If it's a not found error, skip over this piece and continue ingesting
log.Infow("skipping ingestion: piece not found", "piece", pieceCid, "propCid", proposalCid, "err", err)
return nil, skipError(e)
}

// Some other error, pause ingestion
log.Infow("pausing ingestion: error getting piece", "piece", pieceCid, "propCid", proposalCid, "err", err)
return nil, err
}

mhi, err := provider.CarMultihashIterator(ii)
if err != nil {
return nil, fmt.Errorf("failed to get mhiterator: %w", err)
// Bad index, skip over this piece and continue ingesting
err = fmt.Errorf("failed to get mhiterator: %w", err)
log.Infow("skipping ingestion", "piece", pieceCid, "propCid", proposalCid, "err", err)
return nil, skipError(err)
}

log.Debugw("returning piece iterator", "piece", pieceCid, "propCid", proposalCid, "err", err)
return mhi, nil
}

// convert context ID to proposal Cid
proposalCid, err := cid.Cast(contextID)
if err != nil {
return nil, fmt.Errorf("failed to cast context ID to a cid")
// Bad contextID, skip over this piece and continue ingesting
err = fmt.Errorf("failed to cast context ID to a cid")
log.Infow("skipping ingestion", "proposalCid", proposalCid, "err", err)
return nil, skipError(err)
}

// go from proposal cid -> piece cid by looking up deal in boost and if we can't find it there -> then markets
// check Boost deals DB
// Look up deal by proposal cid in the boost database.
// If we can't find it there check legacy markets DB.
pds, boostErr := w.dealsDB.BySignedProposalCID(ctx, proposalCid)
if boostErr == nil {
// Found the deal, get an iterator over the piece
pieceCid := pds.ClientDealProposal.Proposal.PieceCID
return provideF(pieceCid)
return provideF(proposalCid, pieceCid)
}

if !errors.Is(boostErr, sql.ErrNoRows) {
// It's not a "not found" error: there was a problem accessing the
// database. Pause ingestion until the user can fix the DB.
e := fmt.Errorf("getting deal with proposal cid %s from boost database: %w", proposalCid, boostErr)
log.Infow("pausing ingestion", "proposalCid", proposalCid, "err", e)
return nil, e
}

// check in legacy markets
// Deal was not found in boost DB - check in legacy markets
md, legacyErr := w.legacyProv.GetLocalDeal(proposalCid)
if legacyErr == nil {
return provideF(md.Proposal.PieceCID)
// Found the deal, get an interator over the piece
return provideF(proposalCid, md.Proposal.PieceCID)
}

if !errors.Is(legacyErr, datastore.ErrNotFound) {
// It's not a "not found" error: there was a problem accessing the
// database. Pause ingestion until the user can fix the DB.
e := fmt.Errorf("getting deal with proposal cid %s from Legacy Markets: %w", proposalCid, legacyErr)
log.Infow("pausing ingestion", "proposalCid", proposalCid, "err", e)
return nil, e
}

return nil, fmt.Errorf("failed to look up deal in Boost, err=%s and Legacy Markets, err=%s", boostErr, legacyErr)
// The deal was not found in the boost or legacy database.
// Skip this deal and continue ingestion.
err = fmt.Errorf("deal with proposal cid %s not found")
log.Infow("skipping ingestion", "proposalCid", proposalCid, "err", err)
return nil, skipError(err)
})

runCtx, runCancel := context.WithCancel(context.Background())
Expand Down

0 comments on commit 9f1a06b

Please sign in to comment.