Skip to content

Commit

Permalink
refactor: build indexes for legacy deals
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Jun 27, 2023
1 parent d2a7df9 commit 0bd65f0
Show file tree
Hide file tree
Showing 10 changed files with 273 additions and 69 deletions.
5 changes: 4 additions & 1 deletion gql/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/filecoin-project/boost/db"
"github.com/filecoin-project/boost/fundmanager"
gqltypes "github.com/filecoin-project/boost/gql/types"
"github.com/filecoin-project/boost/lib/legacy"
"github.com/filecoin-project/boost/lib/mpoolmonitor"
"github.com/filecoin-project/boost/markets/storageadapter"
"github.com/filecoin-project/boost/node/config"
Expand Down Expand Up @@ -61,6 +62,7 @@ type resolver struct {
fundMgr *fundmanager.FundManager
storageMgr *storagemanager.StorageManager
provider *storagemarket.Provider
legacyDeals *legacy.LegacyDealsManager
legacyProv gfm_storagemarket.StorageProvider
legacyDT dtypes.ProviderDataTransfer
ps piecestore.PieceStore
Expand All @@ -73,7 +75,7 @@ type resolver struct {
mpool *mpoolmonitor.MpoolMonitor
}

func NewResolver(ctx context.Context, cfg *config.Boost, r lotus_repo.LockedRepo, h host.Host, dealsDB *db.DealsDB, logsDB *db.LogsDB, retDB *rtvllog.RetrievalLogDB, plDB *db.ProposalLogsDB, fundsDB *db.FundsDB, fundMgr *fundmanager.FundManager, storageMgr *storagemanager.StorageManager, spApi sealingpipeline.API, provider *storagemarket.Provider, legacyProv gfm_storagemarket.StorageProvider, legacyDT dtypes.ProviderDataTransfer, ps piecestore.PieceStore, sa retrievalmarket.SectorAccessor, piecedirectory *piecedirectory.PieceDirectory, publisher *storageadapter.DealPublisher, fullNode v1api.FullNode, ssm *sectorstatemgr.SectorStateMgr, mpool *mpoolmonitor.MpoolMonitor) *resolver {
func NewResolver(ctx context.Context, cfg *config.Boost, r lotus_repo.LockedRepo, h host.Host, dealsDB *db.DealsDB, logsDB *db.LogsDB, retDB *rtvllog.RetrievalLogDB, plDB *db.ProposalLogsDB, fundsDB *db.FundsDB, fundMgr *fundmanager.FundManager, storageMgr *storagemanager.StorageManager, spApi sealingpipeline.API, provider *storagemarket.Provider, legacyDeals *legacy.LegacyDealsManager, legacyProv gfm_storagemarket.StorageProvider, legacyDT dtypes.ProviderDataTransfer, ps piecestore.PieceStore, sa retrievalmarket.SectorAccessor, piecedirectory *piecedirectory.PieceDirectory, publisher *storageadapter.DealPublisher, fullNode v1api.FullNode, ssm *sectorstatemgr.SectorStateMgr, mpool *mpoolmonitor.MpoolMonitor) *resolver {
return &resolver{
ctx: ctx,
cfg: cfg,
Expand All @@ -87,6 +89,7 @@ func NewResolver(ctx context.Context, cfg *config.Boost, r lotus_repo.LockedRepo
fundMgr: fundMgr,
storageMgr: storageMgr,
provider: provider,
legacyDeals: legacyDeals,
legacyProv: legacyProv,
legacyDT: legacyDT,
ps: ps,
Expand Down
4 changes: 2 additions & 2 deletions gql/resolver_legacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ func (r *resolver) LegacyDeals(ctx context.Context, args dealsArgs) (*legacyDeal
}, nil
}

func (r *resolver) LegacyDealsCount() (int32, error) {
dealCount, err := r.legacyProv.LocalDealCount()
func (r *resolver) LegacyDealsCount(ctx context.Context) (int32, error) {
dealCount, err := r.legacyDeals.DealCount(ctx)
if err != nil {
return 0, fmt.Errorf("getting deal count: %w", err)
}
Expand Down
77 changes: 40 additions & 37 deletions gql/resolver_piece.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package gql
import (
"context"
"fmt"
"golang.org/x/sync/errgroup"
"time"

"github.com/filecoin-project/boost-gfm/retrievalmarket"
"github.com/filecoin-project/boost-gfm/storagemarket"
gqltypes "github.com/filecoin-project/boost/gql/types"
pdtypes "github.com/filecoin-project/boost/piecedirectory/types"
"github.com/filecoin-project/boostd-data/svc/types"
Expand Down Expand Up @@ -65,8 +65,10 @@ type pieceResolver struct {
}

type flaggedPieceResolver struct {
Piece *pieceResolver
CreatedAt graphql.Time
PieceCid string
IndexStatus *indexStatus
DealCount int32
CreatedAt graphql.Time
}

type piecesFlaggedArgs struct {
Expand Down Expand Up @@ -117,23 +119,38 @@ func (r *resolver) PiecesFlagged(ctx context.Context, args piecesFlaggedArgs) (*
return nil, err
}

allLegacyDeals, err := r.legacyProv.ListLocalDeals()
if err != nil {
return nil, err
}

var eg errgroup.Group
flaggedPieceResolvers := make([]*flaggedPieceResolver, 0, len(flaggedPieces))
for _, flaggedPiece := range flaggedPieces {
pieceResolver, err := r.pieceStatus(ctx, flaggedPiece.PieceCid, allLegacyDeals)
if err != nil {
return nil, err
}
flaggedPieceResolvers = append(flaggedPieceResolvers, &flaggedPieceResolver{
Piece: pieceResolver,
CreatedAt: graphql.Time{Time: flaggedPiece.CreatedAt},
flaggedPiece := flaggedPiece
eg.Go(func() error {
// Get piece info from local index directory
pieceInfo, pmErr := r.piecedirectory.GetPieceMetadata(ctx, flaggedPiece.PieceCid)
if pmErr != nil && !types.IsNotFound(pmErr) {
return pmErr
}

// Get the state of the piece's index
idxStatus, err := r.getIndexStatus(pieceInfo, pmErr)
if err != nil {
return err
}

flaggedPieceResolvers = append(flaggedPieceResolvers, &flaggedPieceResolver{
PieceCid: flaggedPiece.PieceCid.String(),
IndexStatus: idxStatus,
DealCount: int32(len(pieceInfo.Deals)),
CreatedAt: graphql.Time{Time: flaggedPiece.CreatedAt},
})
return nil
})
}
err = eg.Wait()
if err != nil {
return nil, err
}

log.Infow("done")
return &flaggedPieceListResolver{
TotalCount: int32(count),
Pieces: flaggedPieceResolvers,
Expand Down Expand Up @@ -194,15 +211,12 @@ func (r *resolver) PiecesWithRootPayloadCid(ctx context.Context, args struct{ Pa
}

// Get legacy markets deals by payload cid
// TODO: add method to markets to filter deals by payload CID
allLegacyDeals, err := r.legacyProv.ListLocalDeals()
legacyDeals, err := r.legacyDeals.ByPayloadCid(ctx, payloadCid)
if err != nil {
return nil, err
}
for _, dl := range allLegacyDeals {
if dl.Ref.Root == payloadCid {
pieceCidSet[dl.ClientDealProposal.Proposal.PieceCID.String()] = struct{}{}
}
for _, dl := range legacyDeals {
pieceCidSet[dl.ClientDealProposal.Proposal.PieceCID.String()] = struct{}{}
}

pieceCids := make([]string, 0, len(pieceCidSet))
Expand Down Expand Up @@ -258,15 +272,6 @@ func (r *resolver) PieceStatus(ctx context.Context, args struct{ PieceCid string
return nil, fmt.Errorf("%s is not a valid piece cid", args.PieceCid)
}

allLegacyDeals, err := r.legacyProv.ListLocalDeals()
if err != nil {
return nil, err
}

return r.pieceStatus(ctx, pieceCid, allLegacyDeals)
}

func (r *resolver) pieceStatus(ctx context.Context, pieceCid cid.Cid, allLegacyDeals []storagemarket.MinerDeal) (*pieceResolver, error) {
// Get piece info from local index directory
pieceInfo, pmErr := r.piecedirectory.GetPieceMetadata(ctx, pieceCid)
if pmErr != nil && !types.IsNotFound(pmErr) {
Expand All @@ -280,11 +285,9 @@ func (r *resolver) pieceStatus(ctx context.Context, pieceCid cid.Cid, allLegacyD
}

// Get legacy markets deals by piece Cid
var legacyDeals []storagemarket.MinerDeal
for _, dl := range allLegacyDeals {
if dl.Ref.PieceCid != nil && *dl.Ref.PieceCid == pieceCid {
legacyDeals = append(legacyDeals, dl)
}
legacyDeals, err := r.legacyDeals.ByPieceCid(ctx, pieceCid)
if err != nil {
return nil, err
}

// Convert local index directory deals to graphQL format
Expand Down Expand Up @@ -357,7 +360,7 @@ func (r *resolver) pieceStatus(ctx context.Context, pieceCid cid.Cid, allLegacyD
}

// Get the state of the piece's index
idxStatus, err := r.getIndexStatus(ctx, pieceCid, pieceInfo, pmErr, deals)
idxStatus, err := r.getIndexStatus(pieceInfo, pmErr)
if err != nil {
return nil, err
}
Expand All @@ -370,7 +373,7 @@ func (r *resolver) pieceStatus(ctx context.Context, pieceCid cid.Cid, allLegacyD
}, nil
}

func (r *resolver) getIndexStatus(ctx context.Context, pieceCid cid.Cid, md pdtypes.PieceDirMetadata, mdErr error, deals []*pieceDealResolver) (*indexStatus, error) {
func (r *resolver) getIndexStatus(md pdtypes.PieceDirMetadata, mdErr error) (*indexStatus, error) {
var idxst IndexStatus
idxerr := ""

Expand Down
8 changes: 2 additions & 6 deletions gql/resolver_sealingpipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,24 +228,20 @@ func (r *resolver) populateWaitDealsSectors(ctx context.Context, sectorNumbers [
}

// match not found in boost db - fallback to legacy deals list
lds, err := r.legacyProv.ListLocalDeals()
lds, err := r.legacyDeals.ByPublishCid(ctx, *publishCid)
if err != nil {
return nil, err
}

var j int
for ; j < len(lds); j++ {
l := lds[j]
if l.PublishCid == nil {
continue
}

lpcid, err := l.ClientDealProposal.Proposal.Cid()
if err != nil {
return nil, err
}

if l.PublishCid.Equals(*publishCid) && lpcid.Equals(dcid) {
if lpcid.Equals(dcid) {
break
}
}
Expand Down
4 changes: 3 additions & 1 deletion gql/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,9 @@ type PieceStatus {

type FlaggedPieceStatus {
CreatedAt: Time!
Piece: PieceStatus!
PieceCid: String!
IndexStatus: IndexStatus!
DealCount: Int!
}

type FlaggedPiecesList {
Expand Down
Loading

0 comments on commit 0bd65f0

Please sign in to comment.