Skip to content

Commit

Permalink
refactor piece doctor
Browse files Browse the repository at this point in the history
  • Loading branch information
nonsense committed May 25, 2023
1 parent 3918e94 commit 578382e
Show file tree
Hide file tree
Showing 13 changed files with 312 additions and 325 deletions.
9 changes: 5 additions & 4 deletions cmd/boostd/recover.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/filecoin-project/boost/cmd/lib"
"github.com/filecoin-project/boost/db"
"github.com/filecoin-project/boost/piecedirectory"
bdclient "github.com/filecoin-project/boostd-data/client"
"github.com/filecoin-project/boostd-data/model"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-commp-utils/writer"
Expand Down Expand Up @@ -190,14 +191,14 @@ func action(cctx *cli.Context) error {
if ignoreLID {
pd = nil
} else {
pdClient := piecedirectory.NewStore()
defer pdClient.Close(ctx)
err = pdClient.Dial(ctx, cctx.String("api-lid"))
cl := bdclient.NewStore()
defer cl.Close(ctx)
err = cl.Dial(ctx, cctx.String("api-lid"))
if err != nil {
return fmt.Errorf("connecting to local index directory service: %w", err)
}
pr := &piecedirectory.SectorAccessorAsPieceReader{SectorAccessor: sa}
pd = piecedirectory.NewPieceDirectory(pdClient, pr, cctx.Int("add-index-throttle"))
pd = piecedirectory.NewPieceDirectory(cl, pr, cctx.Int("add-index-throttle"))
pd.Start(ctx)
}

Expand Down
13 changes: 7 additions & 6 deletions cmd/booster-bitswap/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/filecoin-project/boost/cmd/lib/remoteblockstore"
"github.com/filecoin-project/boost/metrics"
"github.com/filecoin-project/boost/piecedirectory"
bdclient "github.com/filecoin-project/boostd-data/client"
"github.com/filecoin-project/boostd-data/shared/tracing"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -157,9 +158,9 @@ var runCmd = &cli.Command{
defer storageCloser()

// Connect to the local index directory service
pdClient := piecedirectory.NewStore()
defer pdClient.Close(ctx)
err = pdClient.Dial(ctx, cctx.String("api-lid"))
cl := bdclient.NewStore()
defer cl.Close(ctx)
err = cl.Dial(ctx, cctx.String("api-lid"))
if err != nil {
return fmt.Errorf("connecting to local index directory service: %w", err)
}
Expand Down Expand Up @@ -196,8 +197,8 @@ var runCmd = &cli.Command{
return fmt.Errorf("starting block filter: %w", err)
}
pr := &piecedirectory.SectorAccessorAsPieceReader{SectorAccessor: sa}
piecedirectory := piecedirectory.NewPieceDirectory(pdClient, pr, cctx.Int("add-index-throttle"))
remoteStore := remoteblockstore.NewRemoteBlockstore(piecedirectory, &bitswapBlockMetrics)
pd := piecedirectory.NewPieceDirectory(cl, pr, cctx.Int("add-index-throttle"))
remoteStore := remoteblockstore.NewRemoteBlockstore(pd, &bitswapBlockMetrics)
server := NewBitswapServer(remoteStore, host, multiFilter)

var proxyAddrInfo *peer.AddrInfo
Expand All @@ -210,7 +211,7 @@ var runCmd = &cli.Command{
}

// Start the local index directory
piecedirectory.Start(ctx)
pd.Start(ctx)

// Start the bitswap server
log.Infof("Starting booster-bitswap node on port %d", port)
Expand Down
15 changes: 8 additions & 7 deletions cmd/booster-http/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/filecoin-project/boost/cmd/lib/remoteblockstore"
"github.com/filecoin-project/boost/metrics"
"github.com/filecoin-project/boost/piecedirectory"
bdclient "github.com/filecoin-project/boostd-data/client"
"github.com/filecoin-project/boostd-data/model"
"github.com/filecoin-project/boostd-data/shared/tracing"
"github.com/filecoin-project/dagstore/mount"
Expand Down Expand Up @@ -127,9 +128,9 @@ var runCmd = &cli.Command{

// Connect to the local index directory service
ctx := lcli.ReqContext(cctx)
pdClient := piecedirectory.NewStore()
defer pdClient.Close(ctx)
err := pdClient.Dial(ctx, cctx.String("api-lid"))
cl := bdclient.NewStore()
defer cl.Close(ctx)
err := cl.Dial(ctx, cctx.String("api-lid"))
if err != nil {
return fmt.Errorf("connecting to local index directory service: %w", err)
}
Expand Down Expand Up @@ -168,7 +169,7 @@ var runCmd = &cli.Command{

// Create the server API
pr := &piecedirectory.SectorAccessorAsPieceReader{SectorAccessor: sa}
piecedirectory := piecedirectory.NewPieceDirectory(pdClient, pr, cctx.Int("add-index-throttle"))
pd := piecedirectory.NewPieceDirectory(cl, pr, cctx.Int("add-index-throttle"))

opts := &HttpServerOptions{
ServePieces: servePieces,
Expand Down Expand Up @@ -199,11 +200,11 @@ var runCmd = &cli.Command{
GetSizeFailResponseCount: metrics.HttpRblsGetSizeFailResponseCount,
GetSizeSuccessResponseCount: metrics.HttpRblsGetSizeSuccessResponseCount,
}
rbs := remoteblockstore.NewRemoteBlockstore(piecedirectory, &httpBlockMetrics)
rbs := remoteblockstore.NewRemoteBlockstore(pd, &httpBlockMetrics)
filtered := filters.NewFilteredBlockstore(rbs, multiFilter)
opts.Blockstore = filtered
}
sapi := serverApi{ctx: ctx, piecedirectory: piecedirectory, sa: sa}
sapi := serverApi{ctx: ctx, piecedirectory: pd, sa: sa}
server := NewHttpServer(
cctx.String("base-path"),
cctx.Int("port"),
Expand All @@ -212,7 +213,7 @@ var runCmd = &cli.Command{
)

// Start the local index directory
piecedirectory.Start(ctx)
pd.Start(ctx)

// Start the server
log.Infof("Starting booster-http node on port %d with base path '%s'",
Expand Down
21 changes: 21 additions & 0 deletions extern/boostd-data/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/filecoin-project/boostd-data/model"
"github.com/filecoin-project/boostd-data/svc"
"github.com/filecoin-project/boostd-data/svc/types"
"github.com/filecoin-project/go-jsonrpc"
"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -46,6 +47,26 @@ func NewStore(dialOpts ...jsonrpc.Option) *Store {
return &Store{dialOpts: dialOpts}
}

func NewTestStore(ctx context.Context) *Store {
bdsvc, err := svc.NewLevelDB("")
if err != nil {
panic(err)
}
addr := "localhost:0"
err = bdsvc.Start(ctx, addr)
if err != nil {
panic(err)
}

cl := NewStore()
err = cl.Dial(ctx, fmt.Sprintf("ws://%s", addr))
if err != nil {
panic(err)
}

return cl
}

func (s *Store) Dial(ctx context.Context, addr string) error {
var err error
s.closer, err = jsonrpc.NewMergeClient(ctx, addr, "boostddata", []interface{}{&s.client}, nil, s.dialOpts...)
Expand Down
178 changes: 175 additions & 3 deletions indexprovider/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,20 @@ import (
"fmt"
"os"
"path/filepath"
"time"

"github.com/filecoin-project/boost-gfm/storagemarket"
gfm_storagemarket "github.com/filecoin-project/boost-gfm/storagemarket"
"github.com/filecoin-project/boost/db"
"github.com/filecoin-project/boost/markets/idxprov"
"github.com/filecoin-project/boost/node/config"
"github.com/filecoin-project/boost/piecedirectory"
"github.com/filecoin-project/boost/sectorstatemgr"
"github.com/filecoin-project/boost/storagemarket/types"
"github.com/filecoin-project/boost/storagemarket/types/dealcheckpoints"
"github.com/filecoin-project/go-address"
cborutil "github.com/filecoin-project/go-cbor-util"
"github.com/filecoin-project/go-state-types/abi"
lotus_modules "github.com/filecoin-project/lotus/node/modules"
"github.com/filecoin-project/lotus/node/repo"
"github.com/hashicorp/go-multierror"
Expand All @@ -33,12 +39,14 @@ var log = logging.Logger("index-provider-wrapper")
var defaultDagStoreDir = "dagstore"

type Wrapper struct {
enabled bool

cfg *config.Boost
enabled bool
dealsDB *db.DealsDB
legacyProv gfm_storagemarket.StorageProvider
prov provider.Interface
piecedirectory *piecedirectory.PieceDirectory
ssm *sectorstatemgr.SectorStateMgr
meshCreator idxprov.MeshCreator
h host.Host
// bitswapEnabled records whether to announce bitswap as an available
Expand All @@ -49,11 +57,12 @@ type Wrapper struct {

func NewWrapper(cfg *config.Boost) func(lc fx.Lifecycle, h host.Host, r repo.LockedRepo, dealsDB *db.DealsDB,
ssDB *db.SectorStateDB, legacyProv gfm_storagemarket.StorageProvider, prov provider.Interface,
piecedirectory *piecedirectory.PieceDirectory, meshCreator idxprov.MeshCreator, storageService lotus_modules.MinerStorageService) (*Wrapper, error) {
piecedirectory *piecedirectory.PieceDirectory, ssm *sectorstatemgr.SectorStateMgr, meshCreator idxprov.MeshCreator, storageService lotus_modules.MinerStorageService) (*Wrapper, error) {

return func(lc fx.Lifecycle, h host.Host, r repo.LockedRepo, dealsDB *db.DealsDB,
ssDB *db.SectorStateDB, legacyProv gfm_storagemarket.StorageProvider, prov provider.Interface,
piecedirectory *piecedirectory.PieceDirectory,
ssm *sectorstatemgr.SectorStateMgr,
meshCreator idxprov.MeshCreator, storageService lotus_modules.MinerStorageService) (*Wrapper, error) {

if cfg.DAGStore.RootDir == "" {
Expand All @@ -76,6 +85,7 @@ func NewWrapper(cfg *config.Boost) func(lc fx.Lifecycle, h host.Host, r repo.Loc
enabled: !isDisabled,
piecedirectory: piecedirectory,
bitswapEnabled: bitswapEnabled,
ssm: ssm,
}
return w, nil
}
Expand All @@ -84,7 +94,7 @@ func NewWrapper(cfg *config.Boost) func(lc fx.Lifecycle, h host.Host, r repo.Loc
func (w *Wrapper) Start(ctx context.Context) {
w.prov.RegisterMultihashLister(w.MultihashLister)

runCtx, runCancel := context.WithCancel(context.Background())
runCtx, runCancel := context.WithCancel(ctx)
w.stop = runCancel

// Announce all deals on startup in case of a config change
Expand All @@ -94,6 +104,161 @@ func (w *Wrapper) Start(ctx context.Context) {
log.Warnf("announcing extended providers: %w", err)
}
}()

go func() {
duration := time.Duration(w.cfg.Storage.StorageListRefreshDuration)
log.Infof("starting index provider update interval %s", duration.String())
ticker := time.NewTicker(duration)
defer ticker.Stop()

// Check immediately
err := w.checkForUpdates(ctx)
if err != nil {
log.Errorw("checking for state updates", "err", err)
}

// Check every tick
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
err := w.checkForUpdates(ctx)
if err != nil {
log.Errorw("checking for state updates", "err", err)
}
}
}
}()
}

func (w *Wrapper) checkForUpdates(ctx context.Context) error {
legacyDeals, err := w.legacyDealsBySectorID(w.ssm.StateUpdates)
if err != nil {
return fmt.Errorf("getting legacy deals from datastore: %w", err)
}

log.Debugf("checking for sector state updates for %d states", len(w.ssm.StateUpdates))

// For each sector
for sectorID, sectorSealState := range w.ssm.StateUpdates {
// Get the deals in the sector
deals, err := w.dealsBySectorID(ctx, legacyDeals, sectorID)
if err != nil {
return fmt.Errorf("getting deals for miner %d / sector %d: %w", sectorID.Miner, sectorID.Number, err)
}
log.Debugf("sector %d has %d deals, seal status %s", sectorID, len(deals), sectorSealState)

// For each deal in the sector
for _, deal := range deals {
if !deal.AnnounceToIPNI {
continue
}

propnd, err := cborutil.AsIpld(&deal.DealProposal)
if err != nil {
return fmt.Errorf("failed to compute signed deal proposal ipld node: %w", err)
}
propCid := propnd.Cid()

if sectorSealState == db.SealStateRemoved {
// Announce deals that are no longer unsealed to indexer
announceCid, err := w.AnnounceBoostDealRemoved(ctx, propCid)
if err != nil {
// Check if the error is because the deal wasn't previously announced
if !errors.Is(err, provider.ErrContextIDNotFound) {
// There was some other error, write it to the log
log.Errorw("announcing deal removed to index provider",
"deal id", deal.DealID, "error", err)
continue
}
} else {
log.Infow("announced to index provider that deal has been removed",
"deal id", deal.DealID, "sector id", deal.SectorID.Number, "announce cid", announceCid.String())
}
} else if sectorSealState != db.SealStateCache {
// Announce deals that have changed seal state to indexer
md := metadata.GraphsyncFilecoinV1{
PieceCID: deal.DealProposal.Proposal.PieceCID,
FastRetrieval: sectorSealState == db.SealStateUnsealed,
VerifiedDeal: deal.DealProposal.Proposal.VerifiedDeal,
}
announceCid, err := w.AnnounceBoostDealMetadata(ctx, md, propCid)
if err == nil {
log.Infow("announced deal seal state to index provider",
"deal id", deal.DealID, "sector id", deal.SectorID.Number,
"seal state", sectorSealState, "announce cid", announceCid.String())
} else {
log.Errorf("announcing deal %s to index provider: %w", deal.DealID, err)
}
}
}
}

return nil
}

// Get deals by sector ID, whether they're legacy or boost deals
func (w *Wrapper) dealsBySectorID(ctx context.Context, legacyDeals map[abi.SectorID][]storagemarket.MinerDeal, sectorID abi.SectorID) ([]basicDealInfo, error) {
// First query the boost database
deals, err := w.dealsDB.BySectorID(ctx, sectorID)
if err != nil {
return nil, fmt.Errorf("getting deals from boost database: %w", err)
}

basicDeals := make([]basicDealInfo, 0, len(deals))
for _, dl := range deals {
basicDeals = append(basicDeals, basicDealInfo{
AnnounceToIPNI: dl.AnnounceToIPNI,
DealID: dl.DealUuid.String(),
SectorID: sectorID,
DealProposal: dl.ClientDealProposal,
})
}

// Then check the legacy deals
legDeals, ok := legacyDeals[sectorID]
if ok {
for _, dl := range legDeals {
basicDeals = append(basicDeals, basicDealInfo{
AnnounceToIPNI: true,
DealID: dl.ProposalCid.String(),
SectorID: sectorID,
DealProposal: dl.ClientDealProposal,
})
}
}

return basicDeals, nil
}

// Iterate over all legacy deals and make a map of sector ID -> legacy deal.
// To save memory, only include legacy deals with a sector ID that we know
// we're going to query, ie the set of sector IDs in the stateUpdates map.
func (w *Wrapper) legacyDealsBySectorID(stateUpdates map[abi.SectorID]db.SealState) (map[abi.SectorID][]storagemarket.MinerDeal, error) {
legacyDeals, err := w.legacyProv.ListLocalDeals()
if err != nil {
return nil, err
}

bySectorID := make(map[abi.SectorID][]storagemarket.MinerDeal, len(legacyDeals))
for _, deal := range legacyDeals {
minerID, err := address.IDFromAddress(deal.Proposal.Provider)
if err != nil {
// just skip the deal if we can't convert its address to an ID address
continue
}
sectorID := abi.SectorID{
Miner: abi.ActorID(minerID),
Number: deal.SectorNumber,
}
_, ok := w.ssm.StateUpdates[sectorID]
if ok {
bySectorID[sectorID] = append(bySectorID[sectorID], deal)
}
}

return bySectorID, nil
}

func (w *Wrapper) Stop() {
Expand Down Expand Up @@ -381,3 +546,10 @@ func (w *Wrapper) AnnounceBoostDealRemoved(ctx context.Context, propCid cid.Cid)
}
return annCid, err
}

type basicDealInfo struct {
AnnounceToIPNI bool
DealID string
SectorID abi.SectorID
DealProposal storagemarket.ClientDealProposal
}
Loading

0 comments on commit 578382e

Please sign in to comment.