Skip to content

Commit

Permalink
feat: implement indexer provider api
Browse files Browse the repository at this point in the history
  • Loading branch information
simlecode committed Jul 19, 2024
1 parent 0c02960 commit 18dc5b1
Show file tree
Hide file tree
Showing 9 changed files with 373 additions and 114 deletions.
114 changes: 114 additions & 0 deletions api/impl/venus_market.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package impl
import (
"context"
"fmt"
"io"
"os"
"sort"
"time"
Expand All @@ -24,12 +25,14 @@ import (
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multihash"
"github.com/pkg/errors"

"github.com/ipfs-force-community/sophon-auth/jwtclient"

clients2 "github.com/ipfs-force-community/droplet/v2/api/clients"
"github.com/ipfs-force-community/droplet/v2/config"
"github.com/ipfs-force-community/droplet/v2/indexprovider"
"github.com/ipfs-force-community/droplet/v2/minermgr"
"github.com/ipfs-force-community/droplet/v2/models/repo"
"github.com/ipfs-force-community/droplet/v2/network"
Expand Down Expand Up @@ -66,6 +69,7 @@ type MarketNodeImpl struct {
DataTransfer network.ProviderDataTransfer
DealPublisher *storageprovider.DealPublisher
DealAssigner storageprovider.DealAssiger
IndexProviderMgr *indexprovider.IndexProviderMgr

DirectDealProvider *storageprovider.DirectDealProvider

Expand Down Expand Up @@ -1392,3 +1396,113 @@ func (m *MarketNodeImpl) UpdateDirectDealState(ctx context.Context, id uuid.UUID

return m.Repo.DirectDealRepo().SaveDeal(ctx, deal)
}

func (m *MarketNodeImpl) IndexerAnnounceAllDeals(ctx context.Context, minerAddr address.Address) error {
return m.IndexProviderMgr.IndexAnnounceAllDeals(ctx, minerAddr)
}

func (m *MarketNodeImpl) getDeal(ctx context.Context, contextID []byte) (any, bool, error) {
propCID, err := cid.Cast(contextID)
if err == nil {
deal, err := m.Repo.StorageDealRepo().GetDeal(ctx, propCID)
if err != nil {
return address.Address{}, false, err
}
return deal, false, nil
}
dealUUID, err := uuid.FromBytes(contextID)
if err != nil {
return address.Address{}, false, err
}

directDeal, err := m.Repo.DirectDealRepo().GetDeal(ctx, dealUUID)
if err == nil {
return directDeal, true, nil
}

deal, err := m.Repo.StorageDealRepo().GetDealByUUID(ctx, dealUUID)
if err != nil {
return address.Address{}, false, err
}

return deal, false, nil
}

func (m *MarketNodeImpl) IndexerListMultihashes(ctx context.Context, contextID []byte) ([]multihash.Multihash, error) {
deal, isDDO, err := m.getDeal(ctx, contextID)
if err != nil {
return nil, err
}
var miner address.Address
if isDDO {
miner = deal.(*types.DirectDeal).Provider
} else {
miner = deal.(*types.MinerDeal).Proposal.Provider
}

it, err := m.IndexProviderMgr.MultihashLister(ctx, miner, "", contextID)
if err != nil {
return nil, err
}

var mhs []multihash.Multihash
mh, err := it.Next()
for {
if err != nil {
if errors.Is(err, io.EOF) {
return mhs, nil
}
return nil, err
}
mhs = append(mhs, mh)

mh, err = it.Next()
}
}

func (m *MarketNodeImpl) IndexerAnnounceLatest(ctx context.Context) (cid.Cid, error) {
var c cid.Cid
var err error
for _, miner := range m.Config.Miners {
c, err = m.IndexProviderMgr.IndexerAnnounceLatest(ctx, address.Address(miner.Addr))
if err != nil {
return c, err
}
}

return c, nil
}

func (m *MarketNodeImpl) IndexerAnnounceLatestHttp(ctx context.Context, urls []string) (cid.Cid, error) {
var c cid.Cid
var err error
for _, miner := range m.Config.Miners {
c, err = m.IndexProviderMgr.IndexerAnnounceLatestHttp(ctx, address.Address(miner.Addr), urls)
if err != nil {
return c, err
}
}

return c, nil
}

func (m *MarketNodeImpl) IndexerAnnounceDealRemoved(ctx context.Context, propCid cid.Cid) (cid.Cid, error) {
deal, err := m.Repo.StorageDealRepo().GetDeal(ctx, propCid)
if err != nil {
return cid.Undef, err
}

return m.IndexProviderMgr.AnnounceDealRemoved(ctx, deal.Proposal.Provider, propCid)
}

func (m *MarketNodeImpl) IndexerAnnounceDeal(ctx context.Context, contextID []byte) (cid.Cid, error) {
deal, isDDO, err := m.getDeal(ctx, contextID)
if err != nil {
return cid.Undef, err
}
if isDDO {
return m.IndexProviderMgr.AnnounceDirectDeal(ctx, deal.(*types.DirectDeal))
}

return m.IndexProviderMgr.AnnounceDeal(ctx, deal.(*types.MinerDeal))
}
2 changes: 1 addition & 1 deletion config/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func defaultProviderConfig() *ProviderConfig {
HTTPRetrievalMultiaddr: "",

IndexProvider: IndexProviderConfig{
Enable: false,
Enable: true,
EntriesCacheCapacity: 1024,
EntriesChunkSize: 16384,
TopicName: "",
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/filecoin-project/go-statestore v0.2.0
github.com/filecoin-project/specs-actors/v2 v2.3.6
github.com/filecoin-project/specs-actors/v7 v7.0.1
github.com/filecoin-project/venus v1.16.0-rc1.0.20240711070424-0278ec1b0331
github.com/filecoin-project/venus v1.16.0-rc1.0.20240719060644-524bcbcc285e
github.com/golang/mock v1.6.0
github.com/google/uuid v1.6.0
github.com/gorilla/mux v1.8.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -600,8 +600,8 @@ github.com/filecoin-project/specs-actors/v8 v8.0.1 h1:4u0tIRJeT5G7F05lwLRIsDnsrN
github.com/filecoin-project/specs-actors/v8 v8.0.1/go.mod h1:UYIPg65iPWoFw5NEftREdJwv9b/5yaLKdCgTvNI/2FA=
github.com/filecoin-project/specs-storage v0.4.1 h1:yvLEaLZj8f+uByhNC4mFOtCUyL2wQku+NGBp6hjTe9M=
github.com/filecoin-project/specs-storage v0.4.1/go.mod h1:Z2eK6uMwAOSLjek6+sy0jNV2DSsMEENziMUz0GHRFBw=
github.com/filecoin-project/venus v1.16.0-rc1.0.20240711070424-0278ec1b0331 h1:lfCbamo9eRNlgwMclEu1PHTexr8AIg5V6Pm7+Qh551A=
github.com/filecoin-project/venus v1.16.0-rc1.0.20240711070424-0278ec1b0331/go.mod h1:tKy8zCgGOVDPzjFfHOZ7YRkHsmKsub5ROC138pLBLAY=
github.com/filecoin-project/venus v1.16.0-rc1.0.20240719060644-524bcbcc285e h1:T5TSW4Y7yyeYkyjergZcyPamJt7SbDsPftGiQxnjPEo=
github.com/filecoin-project/venus v1.16.0-rc1.0.20240719060644-524bcbcc285e/go.mod h1:nxXNLwFoD9RogfvFc4Zz4n95aTENHo+/E0MmTEvxRB8=
github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc=
github.com/flynn/noise v1.0.0/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwUTag=
github.com/flynn/noise v1.1.0 h1:KjPQoQCEFdZDiP03phOvGi11+SVVhBG2wOWAorLsstg=
Expand Down
56 changes: 8 additions & 48 deletions indexprovider/index_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import (
"github.com/multiformats/go-multihash"
"go.mongodb.org/mongo-driver/mongo"

"github.com/filecoin-project/go-fil-markets/stores"
v1 "github.com/filecoin-project/venus/venus-shared/api/chain/v1"
types "github.com/filecoin-project/venus/venus-shared/types/market"
"github.com/ipfs-force-community/droplet/v2/config"
"github.com/ipfs-force-community/droplet/v2/dagstore"
"github.com/ipfs-force-community/droplet/v2/models/repo"
)

Expand All @@ -34,7 +34,7 @@ type Wrapper struct {
enabled bool

h host.Host
dagWrapper *dagstore.Wrapper
dagStore stores.DAGStoreWrapper
full v1.FullNode
cfg *config.ProviderConfig
dealsDB repo.StorageDealRepo
Expand All @@ -53,7 +53,7 @@ func NewWrapper(h host.Host,
cfg *config.ProviderConfig,
full v1.FullNode,
r repo.Repo,
dagWrapper *dagstore.Wrapper,
dagStore stores.DAGStoreWrapper,
prov provider.Interface,
) (*Wrapper, error) {
_, isDisabled := prov.(*DisabledIndexProvider)
Expand All @@ -76,7 +76,7 @@ func NewWrapper(h host.Host,
bitswapEnabled: bitswapEnabled,
httpEnabled: httpEnabled,
full: full,
dagWrapper: dagWrapper,
dagStore: dagStore,
}

return w, nil
Expand Down Expand Up @@ -317,7 +317,7 @@ func (w *Wrapper) MultihashLister(ctx context.Context, prov peer.ID, contextID [
idName = "UUID"
}
llog := log.With(idName, identifier, "piece", pieceCid)
ii, err := w.dagWrapper.GetIterableIndexForPiece(pieceCid)
ii, err := w.dagStore.GetIterableIndexForPiece(pieceCid)
if err != nil {
e := fmt.Errorf("failed to get iterable index: %w", err)
if strings.Contains(err.Error(), "file does not exist") ||
Expand Down Expand Up @@ -421,13 +421,12 @@ func (w *Wrapper) AnnounceDeal(ctx context.Context, deal *types.MinerDeal) (cid.
// return cid.Undef, nil
// }

propCid := deal.ProposalCid
md := metadata.GraphsyncFilecoinV1{
PieceCID: deal.ClientDealProposal.Proposal.PieceCID,
FastRetrieval: deal.FastRetrieval,
VerifiedDeal: deal.ClientDealProposal.Proposal.VerifiedDeal,
}
return w.AnnounceDealMetadata(ctx, md, propCid.Bytes())
return w.AnnounceDealMetadata(ctx, md, deal.ProposalCid.Bytes())
}

func (w *Wrapper) AnnounceDealMetadata(ctx context.Context, md metadata.GraphsyncFilecoinV1, contextID []byte) (cid.Cid, error) {
Expand All @@ -454,7 +453,7 @@ func (w *Wrapper) AnnounceDealMetadata(ctx context.Context, md metadata.Graphsyn
return annCid, nil
}

func (w *Wrapper) AnnounceDealRemoved(ctx context.Context, propCid cid.Cid) (cid.Cid, error) {
func (w *Wrapper) AnnounceDealRemoved(ctx context.Context, contextID []byte) (cid.Cid, error) {
if !w.enabled {
return cid.Undef, errors.New("cannot announce deal removal: index provider is disabled")
}
Expand All @@ -466,28 +465,13 @@ func (w *Wrapper) AnnounceDealRemoved(ctx context.Context, propCid cid.Cid) (cid
}

// Announce deal removal to network Indexer
annCid, err := w.prov.NotifyRemove(ctx, "", propCid.Bytes())
annCid, err := w.prov.NotifyRemove(ctx, "", contextID)
if err != nil {
return cid.Undef, fmt.Errorf("failed to announce deal removal to index provider: %w", err)
}
return annCid, err
}

func (w *Wrapper) AnnounceLegcayDealToIndexer(ctx context.Context, proposalCid cid.Cid) (cid.Cid, error) {
deal, err := w.dealsDB.GetDeal(ctx, proposalCid)
if err != nil {
return cid.Undef, fmt.Errorf("failed getting deal %s: %w", proposalCid, err)
}

mt := metadata.GraphsyncFilecoinV1{
PieceCID: deal.Proposal.PieceCID,
FastRetrieval: deal.FastRetrieval,
VerifiedDeal: deal.Proposal.VerifiedDeal,
}

return w.AnnounceDealMetadata(ctx, mt, proposalCid.Bytes())
}

func (w *Wrapper) AnnounceDirectDeal(ctx context.Context, entry *types.DirectDeal) (cid.Cid, error) {
// Filter out deals that should not be announced
// if !entry.AnnounceToIPNI {
Expand All @@ -506,27 +490,3 @@ func (w *Wrapper) AnnounceDirectDeal(ctx context.Context, entry *types.DirectDea
}
return w.AnnounceDealMetadata(ctx, md, contextID)
}

func (w *Wrapper) AnnounceDirectDealRemoved(ctx context.Context, dealUUID uuid.UUID) (cid.Cid, error) {
if !w.enabled {
return cid.Undef, errors.New("cannot announce deal removal: index provider is disabled")
}

// Ensure we have a connection with the full node host so that the index provider gossip sub announcements make their
// way to the filecoin bootstrapper network
if err := w.meshCreator.Connect(ctx); err != nil {
log.Errorw("failed to connect node to full daemon node", "err", err)
}

contextID, err := dealUUID.MarshalBinary()
if err != nil {
return cid.Undef, fmt.Errorf("marshalling the deal UUID: %w", err)
}

// Announce deal removal to network Indexer
annCid, err := w.prov.NotifyRemove(ctx, "", contextID)
if err != nil {
return cid.Undef, fmt.Errorf("failed to announce deal removal to index provider: %w", err)
}
return annCid, err
}
Loading

0 comments on commit 18dc5b1

Please sign in to comment.